You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 20:12:26 UTC
[49/50] [abbrv] incubator-ignite git commit: #[IGNITE-218]: mapred
execution in correct user context.
#[IGNITE-218]: mapred execution in correct user context.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4d32feeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4d32feeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4d32feeb
Branch: refs/heads/ignite-218
Commit: 4d32feeb0d206ce6edd10b7e649dca928804bd74
Parents: 1b7cead
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 21:01:35 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 21:01:35 2015 +0300
----------------------------------------------------------------------
config/hadoop/default-config.xml | 3 +
.../processors/hadoop/HadoopTaskContext.java | 11 ++
.../internal/processors/igfs/IgfsUtils.java | 4 -
.../fs/IgniteHadoopFileSystemCounterWriter.java | 2 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 16 +--
.../processors/hadoop/HadoopDefaultJobInfo.java | 2 +-
.../internal/processors/hadoop/HadoopUtils.java | 53 +++++++++
.../hadoop/SecondaryFileSystemProvider.java | 2 +-
.../hadoop/fs/HadoopLazyConcurrentMap.java | 5 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 111 ++++++++++---------
.../processors/hadoop/v2/HadoopV2Job.java | 2 +-
.../hadoop/v2/HadoopV2JobResourceManager.java | 4 +-
.../hadoop/v2/HadoopV2TaskContext.java | 51 +++++++++
.../hadoop/HadoopClientProtocolSelfTest.java | 6 +-
.../HadoopDefaultMapReducePlannerSelfTest.java | 2 -
.../processors/hadoop/HadoopMapReduceTest.java | 15 ++-
.../hadoop/HadoopTasksAllVersionsTest.java | 15 ++-
.../processors/hadoop/HadoopTasksV1Test.java | 5 +-
.../processors/hadoop/HadoopTasksV2Test.java | 5 +-
.../processors/hadoop/HadoopV2JobSelfTest.java | 6 +-
.../hadoop/examples/HadoopWordCount2.java | 24 +---
.../collections/HadoopAbstractMapTest.java | 16 +++
22 files changed, 238 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/config/hadoop/default-config.xml
----------------------------------------------------------------------
diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml
index 30413b0..c1e4855 100644
--- a/config/hadoop/default-config.xml
+++ b/config/hadoop/default-config.xml
@@ -90,6 +90,9 @@
Configuration of Ignite node.
-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <!-- Temporary workaround for tests: -->
+ <property name="localHost" value="127.0.0.1"/>
+
<!--
Apache Hadoop Accelerator configuration.
-->
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 371fd81..47c55bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* Task context.
@@ -187,4 +188,14 @@ public abstract class HadoopTaskContext {
* @throws IgniteCheckedException If failed.
*/
public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+
+ /**
+ *
+ * @param user
+ * @param callable
+ * @param <T>
+ * @return
+ * @throws IgniteCheckedException
+ */
+ public abstract <T> T runAs(String user, Callable<T> callable) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 71a1dbe..8026a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -92,11 +92,7 @@ public class IgfsUtils {
}
/**
-<<<<<<< HEAD
- * Provides non-null interned user name.
-=======
* Provides non-null user name.
->>>>>>> 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b
* If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
* which is the current process owner user.
* @param user a user name to be fixed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 821acdb..cb4f19b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -54,7 +54,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
@Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs)
throws IgniteCheckedException {
- Configuration hadoopCfg = new Configuration();
+ Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
hadoopCfg.set(e.getKey(), e.getValue());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index c0a9ade..bbb8c5f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -179,14 +179,14 @@ public class IgniteHadoopFileSystem extends FileSystem {
public static String getFsHadoopUser(Configuration cfg) throws IOException {
String user = null;
- // -------------------------------------------
- // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
- // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
- // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
- // ugi.doAs() closure.
- if (cfg != null)
- user = cfg.get(MRJobConfig.USER_NAME);
- // -------------------------------------------
+// // -------------------------------------------
+// // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
+// // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
+// // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
+// // ugi.doAs() closure.
+// if (cfg != null)
+// user = cfg.get(MRJobConfig.USER_NAME);
+// // -------------------------------------------
if (user == null) {
UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d0a327e..2e855d0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
synchronized (HadoopDefaultJobInfo.class) {
if ((jobCls0 = jobCls) == null) {
- HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main");
+ HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job");
jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index d493bd4..ca3a6c5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -304,6 +304,59 @@ public class HadoopUtils {
}
/**
+ *
+ * @return
+ */
+ static Configuration checkPreconditionAndCreateConfiguration() {
+ ClassLoader confCl = Configuration.class.getClassLoader();
+
+ ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
+
+ if (ctxCl != null && confCl != ctxCl)
+ throw new IllegalStateException("Wrong classloader: " + confCl + " != " + ctxCl);
+
+ return new Configuration();
+ }
+
+ /**
+ * For diagnostic & test purposes.
+ * @param c the Configuration to check.
+ */
+ static void checkConfiguration(Configuration c) {
+ String name = Configuration.class.getName();
+
+ c.set("xxx", name);
+
+ Class clazz = c.getClass("xxx", null);
+
+ c.unset("xxx");
+
+ if (clazz != c.getClass())
+ throw new IllegalStateException("Wrong configuration.");
+ }
+
+ /**
+ *
+ * @return
+ */
+ public static Configuration safeCreateConfiguration() {
+ final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+ Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+
+ try {
+ Configuration c = checkPreconditionAndCreateConfiguration();
+
+ checkConfiguration(c);
+
+ return c;
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(cl0);
+ }
+ }
+
+ /**
* Constructor.
*/
private HadoopUtils() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index b1a057c..f9a68f1 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -34,7 +34,7 @@ import java.security.*;
*/
public class SecondaryFileSystemProvider {
/** Configuration of the secondary filesystem, never null. */
- private final Configuration cfg = new Configuration();
+ private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
/** The secondary filesystem URI, never null. */
private final URI uri;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index f1c10b6..71b38c4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -18,13 +18,12 @@
package org.apache.ignite.internal.processors.hadoop.fs;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
import org.jsr166.*;
+import java.io.*;
import java.util.*;
import java.util.concurrent.*;
-import org.apache.ignite.internal.util.future.*;
-
-import java.io.*;
import java.util.concurrent.locks.*;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 1919fc4..ac141a9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -17,21 +17,14 @@
package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.security.*;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import java.io.*;
-import java.security.*;
import java.util.*;
import java.util.concurrent.*;
@@ -111,56 +104,76 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
user = IgfsUtils.fixUserName(user);
- String ugiUser;
- try {
- UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
-
- ugiUser = currUser.getShortUserName();
- }
- catch (IOException ioe) {
- throw new IgniteCheckedException(ioe);
- }
-
- if (F.eq(user, ugiUser))
- // if current UGI context user is the same, do direct call:
- return callImpl();
- else {
- // do the call in the context of 'user':
- try {
- final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
-
- UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user);
-
- return ugi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override public Void run() throws IgniteCheckedException {
- return callImpl();
- }
- });
- } catch (IOException | InterruptedException e) {
- throw new IgniteCheckedException(e);
- }
- }
+ return callImpl(user);
+
+// String ugiUser;
+// try {
+// UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+//
+// ugiUser = currUser.getShortUserName();
+// }
+// catch (IOException ioe) {
+// throw new IgniteCheckedException(ioe);
+// }
+//
+// if (F.eq(user, ugiUser))
+// // if current UGI context user is the same, do direct call:
+// return callImpl();
+// else {
+// // do the call in the context of 'user':
+// try {
+// final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+//
+// UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user);
+//
+// return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+// @Override public Void run() throws IgniteCheckedException {
+// return callImpl();
+// }
+// });
+// } catch (IOException | InterruptedException e) {
+// throw new IgniteCheckedException(e);
+// }
+// }
}
+// /**
+// * Gets the job property.
+// */
+// private String getJobProperty(String key) {
+// if (job instanceof HadoopV2Job) {
+// Configuration conf = ((HadoopV2Job)job).jobConf();
+//
+// return conf.get(key);
+// }
+// else
+// return job.info().property(key);
+// }
+
/**
- * Gets the job property.
+ * Runnable task call implementation
+ * @return null.
+ * @throws IgniteCheckedException
*/
- private String getJobProperty(String key) {
- if (job instanceof HadoopV2Job) {
- Configuration conf = ((HadoopV2Job)job).jobConf();
+ Void callImpl(final String user) throws IgniteCheckedException {
+ assert user != null;
- return conf.get(key);
- }
- else
- return job.info().property(key);
+ ctx = job.getTaskContext(info);
+
+ return ctx.runAs(user, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ runTaskImpl();
+
+ return null;
+ }
+ });
}
/**
- * Runnable task call implementation
- * @return null.
+ * Implements actual task running.
* @throws IgniteCheckedException
*/
- Void callImpl() throws IgniteCheckedException {
+ void runTaskImpl() throws IgniteCheckedException {
execStartTs = U.currentTimeMillis();
Throwable err = null;
@@ -170,8 +183,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
HadoopPerformanceCounter perfCntr = null;
try {
- ctx = job.getTaskContext(info);
-
perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
perfCntr.onTaskSubmit(info, submitTs);
@@ -218,8 +229,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
if (ctx != null)
ctx.cleanupTaskEnvironment();
}
-
- return null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index a4a7b25..3d47960 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -205,7 +205,7 @@ public class HadoopV2Job implements HadoopJob {
// Note that the classloader identified by the task it was initially created for,
// but later it may be reused for other tasks.
HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
- "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
+ "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 7dcd10b..7bc0fb0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -163,7 +163,6 @@ public class HadoopV2JobResourceManager {
stagingDir = new Path(new URI(mrDir));
if (download) {
- // TODO: Out of bounds.
try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg)) {
if (!fs.exists(stagingDir))
throw new IgniteCheckedException("Failed to find map-reduce " +
@@ -199,7 +198,8 @@ public class HadoopV2JobResourceManager {
}
}
else if (!jobLocDir.mkdirs())
- throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
+ throw new IgniteCheckedException("Failed to create local job directory: "
+ + jobLocDir.getAbsolutePath());
setLocalFSWorkingDirectory(jobLocDir);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 9086874..0bbe1d7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -28,16 +28,20 @@ import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import java.io.*;
+import java.security.*;
import java.util.*;
+import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -447,4 +451,51 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
throw new IgniteCheckedException(e);
}
}
+
+ @Override public <T> T runAs(final String user, final Callable<T> callable) throws IgniteCheckedException {
+ String ugiUser;
+ try {
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+ ugiUser = currUser.getShortUserName();
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe);
+ }
+
+ try {
+ if (F.eq(user, ugiUser))
+ // if current UGI context user is the same, do direct call:
+ return callable.call();
+ else {
+ // do the call in the context of 'user':
+// final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+//
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ @Override public T run() throws Exception {
+ return callable.call();
+ }
+ });
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+// /**
+// * Gets the job property.
+// */
+// private String getJobProperty(String key) {
+// if (job instanceof HadoopV2Job) {
+// Configuration conf = ((HadoopV2Job)job).jobConf();
+//
+// return conf.get(key);
+// }
+// else
+// return job.info().property(key);
+// }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
index b94d9d1..b9f8179 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.*;
import org.apache.ignite.hadoop.mapreduce.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
* @return Configuration.
*/
private Configuration config(int port) {
- Configuration conf = new Configuration();
+ Configuration conf = HadoopUtils.safeCreateConfiguration();
setupFileSystems(conf);
@@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
ctx.getCounter(TestCounter.COUNTER2).increment(1);
int sum = 0;
- for (IntWritable value : values) {
+ for (IntWritable value : values)
sum += value.get();
- }
ctx.write(key, new IntWritable(sum));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index 30babe3..f3c74fc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -1000,7 +1000,5 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
@Override public GridKernalContext context() {
return null;
}
-
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 8a3a0ac..f96eb74 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.testframework.*;
@@ -105,14 +106,16 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
checkJobStatistics(jobId);
+ final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
- useNewReducer,
+ useNewReducer,
"blue\t200000\n" +
- "green\t150000\n" +
- "red\t100000\n" +
- "yellow\t70000\n",
- readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000")
- );
+ "green\t150000\n" +
+ "red\t100000\n" +
+ "yellow\t70000\n",
+ readAndSortFile(outFile)
+ );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
index aaf0f92..6930020 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.io.*;
import java.net.*;
@@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Hadoop job.
* @throws IOException If fails.
*/
- public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+ public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
/**
* @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
@@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
igfs.info(inFile).length() - fileBlock1.length());
- HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
@@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Context with mock output.
* @throws IgniteCheckedException If fails.
*/
- private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+ private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
int taskNum, String... words) throws IgniteCheckedException {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
@@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @throws Exception If fails.
*/
public void testReduceTask() throws Exception {
- HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
@@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @throws Exception If fails.
*/
public void testCombinerTask() throws Exception {
- HadoopV2Job gridJob = getHadoopJob("/", "/");
+ HadoopJob gridJob = getHadoopJob("/", "/");
HadoopTestTaskContext ctx =
runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
@@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
* @return Context of combine task with mock output.
* @throws IgniteCheckedException If fails.
*/
- private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+ private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
throws IgniteCheckedException {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
@@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
- HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+ HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index b41a260..48e83cc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop;
import org.apache.hadoop.mapred.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.io.*;
import java.util.*;
@@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
* @return Hadoop job.
* @throws IOException If fails.
*/
- @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+ @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
setupFileSystems(jobConf);
@@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
- return new HadoopV2Job(jobId, jobInfo, log);
+ return jobInfo.createJob(jobId, log);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index b677c63..e73fae3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
import java.util.*;
@@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
* @return Hadoop job.
* @throws Exception if fails.
*/
- @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+ @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
Job job = Job.getInstance();
job.setOutputKeyClass(Text.class);
@@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
- return new HadoopV2Job(jobId, jobInfo, log);
+ return jobInfo.createJob(jobId, log);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index ebc89f4..f3b9307 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
cfg.setMapOutputValueClass(Text.class);
cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
- HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+ HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+ HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1);
+
+ HadoopJob job = info.createJob(id, log);
HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
null));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
index 8637a4e..dc68df7 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
@@ -17,13 +17,11 @@
package org.apache.ignite.internal.processors.hadoop.examples;
-import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
-import org.apache.ignite.internal.processors.hadoop.*;
import java.io.*;
@@ -46,24 +44,6 @@ public class HadoopWordCount2 {
Job job = getJob(args[0], args[1]);
job.submit();
-
- printCounters(job);
-
- job.waitForCompletion(true);
-
- printCounters(job);
- }
-
- private static void printCounters(Job job) throws IOException {
- Counters counters = job.getCounters();
-
- for (CounterGroup group : counters) {
- System.out.println("Group: " + group.getDisplayName() + "," + group.getName());
- System.out.println(" number of counters: " + group.size());
- for (Counter counter : group) {
- System.out.println(" - " + counter.getDisplayName() + ": " + counter.getName() + ": "+counter.getValue());
- }
- }
}
/**
@@ -75,9 +55,7 @@ public class HadoopWordCount2 {
* @throws IOException If fails.
*/
public static Job getJob(String input, String output) throws IOException {
- Configuration cfg = HadoopStartup.configuration();
-
- Job job = Job.getInstance(cfg);
+ Job job = Job.getInstance();
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index b4ed5e1..c894b76 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*;
import org.jetbrains.annotations.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* Abstract class for maps test.
@@ -98,6 +99,21 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
@Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
assert false;
}
+
+ /**
+ * @param user
+ * @param callable
+ * @return
+ * @throws IgniteCheckedException
+ */
+ @Override public <T> T runAs(String user, Callable<T> callable) throws IgniteCheckedException {
+ try {
+ return callable.call();
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
}
/**