You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/01 11:23:51 UTC
[4/5] incubator-ignite git commit: #[IGNITE-218]: save temporary
state for review.
#[IGNITE-218]: save temporary state for review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/56cb251f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/56cb251f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/56cb251f
Branch: refs/heads/ignite-218
Commit: 56cb251fa11c23f4390e049c455d0f64388be8a0
Parents: c79d26a
Author: iveselovskiy <iv...@gridgain.com>
Authored: Mon Jun 1 12:22:42 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Mon Jun 1 12:22:42 2015 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopTaskContext.java | 18 ++--
.../fs/IgniteHadoopFileSystemCounterWriter.java | 7 +-
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 24 ++---
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 2 +-
.../internal/processors/hadoop/HadoopUtils.java | 1 +
.../hadoop/v2/HadoopV2JobResourceManager.java | 6 +-
.../hadoop/v2/HadoopV2TaskContext.java | 24 +----
.../hadoop/HadoopAbstractSelfTest.java | 1 -
.../processors/hadoop/HadoopMapReduceTest.java | 94 +++++++++++++++++++-
.../testsuites/IgniteHadoopTestSuite.java | 2 +-
10 files changed, 122 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 47c55bd..d0fafc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -190,12 +190,14 @@ public abstract class HadoopTaskContext {
public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
/**
- *
- * @param user
- * @param callable
- * @param <T>
- * @return
- * @throws IgniteCheckedException
- */
- public abstract <T> T runAs(String user, Callable<T> callable) throws IgniteCheckedException;
+ * Executes a callable on behalf of the specified user.
+ * In case of embedded task execution the implementation of this method
+ * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with.
+ * @param user The user name.
+ * @param c The callable.
+ * @param <T> The return type of the Callable.
+ * @return The result of the callable.
+ * @throws IgniteCheckedException On any error in callable.
+ */
+ public abstract <T> T runAs(String user, Callable<T> c) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index cb4f19b..bbafcd7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
import java.io.*;
@@ -39,9 +40,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
/** */
- private static final String DEFAULT_USER_NAME = "anonymous";
-
- /** */
public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
/** */
@@ -61,8 +59,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
String user = jobInfo.user();
- if (F.isEmpty(user))
- user = DEFAULT_USER_NAME;
+ user = IgfsUtils.fixUserName(user);
String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index bbb8c5f..328120b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.security.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
@@ -173,27 +172,16 @@ public class IgniteHadoopFileSystem extends FileSystem {
}
/**
- * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+ * Gets non-null user name as per the Hadoop file system viewpoint.
* @return the user name, never null.
*/
- public static String getFsHadoopUser(Configuration cfg) throws IOException {
+ public static String getFsHadoopUser() throws IOException {
String user = null;
-// // -------------------------------------------
-// // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
-// // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
-// // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
-// // ugi.doAs() closure.
-// if (cfg != null)
-// user = cfg.get(MRJobConfig.USER_NAME);
-// // -------------------------------------------
+ UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
- if (user == null) {
- UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
-
- if (currUgi != null)
- user = currUgi.getShortUserName();
- }
+ if (currUgi != null)
+ user = currUgi.getShortUserName();
user = IgfsUtils.fixUserName(user);
@@ -242,7 +230,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
uriAuthority = uri.getAuthority();
- user = getFsHadoopUser(cfg);
+ user = getFsHadoopUser();
// Override sequential reads before prefetch if needed.
seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index f3fbe9c..8330143 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
uri = name;
- user = getFsHadoopUser(cfg);
+ user = getFsHadoopUser();
try {
initialize(name, cfg);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index ca3a6c5..12015af 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -303,6 +303,7 @@ public class HadoopUtils {
return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
}
+ // TODO: after disagniostic & testing leave only one method "safeCreateConfiguration()"
/**
*
* @return
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 7bc0fb0..8f1e1ab 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -95,7 +95,7 @@ public class HadoopV2JobResourceManager {
}
/**
- * Gets non-null and interned user name as per the Hadoop viewpoint.
+ * Gets non-null user name as per the Hadoop viewpoint.
* @param cfg the Hadoop job configuration, may be null.
* @return the user name, never null.
*/
@@ -103,7 +103,9 @@ public class HadoopV2JobResourceManager {
String user = cfg.get(MRJobConfig.USER_NAME);
if (user == null)
- user = IgniteHadoopFileSystem.getFsHadoopUser(cfg);
+ user = IgniteHadoopFileSystem.getFsHadoopUser();
+
+ X.println("##### Mr user = [" + user + "]"); // TODO: remove
return user;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 0bbe1d7..ee10b01 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -452,7 +452,8 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
}
}
- @Override public <T> T runAs(final String user, final Callable<T> callable) throws IgniteCheckedException {
+ /** {@inheritDoc} */
+ @Override public <T> T runAs(final String user, final Callable<T> c) throws IgniteCheckedException {
String ugiUser;
try {
UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
@@ -466,16 +467,13 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
try {
if (F.eq(user, ugiUser))
// if current UGI context user is the same, do direct call:
- return callable.call();
+ return c.call();
else {
- // do the call in the context of 'user':
-// final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
-//
UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
return ugi.doAs(new PrivilegedExceptionAction<T>() {
@Override public T run() throws Exception {
- return callable.call();
+ return c.call();
}
});
}
@@ -484,18 +482,4 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
throw new IgniteCheckedException(e);
}
}
-
-// /**
-// * Gets the job property.
-// */
-// private String getJobProperty(String key) {
-// if (job instanceof HadoopV2Job) {
-// Configuration conf = ((HadoopV2Job)job).jobConf();
-//
-// return conf.get(key);
-// }
-// else
-// return job.info().property(key);
-// }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index af1a1e1..f41eb17 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index f96eb74..7d09433 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -24,13 +24,13 @@ import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.hadoop.fs.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.testframework.*;
@@ -215,4 +215,96 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
}
}
+
+// /**
+// * Startup secondary file system.
+// *
+// * @throws Exception If failed.
+// */
+// private void startUpSecondary() throws Exception {
+// FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+//
+// igfsCfg.setDataCacheName("partitioned");
+// igfsCfg.setMetaCacheName("replicated");
+// igfsCfg.setName("igfs-secondary");
+// igfsCfg.setBlockSize(512 * 1024);
+// igfsCfg.setDefaultMode(PRIMARY);
+//
+// IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+//
+// endpointCfg.setType(IgfsIpcEndpointType.TCP);
+// endpointCfg.setPort(11500);
+//
+// igfsCfg.setIpcEndpointConfiguration(endpointCfg);
+//
+// CacheConfiguration cacheCfg = defaultCacheConfiguration();
+//
+// cacheCfg.setName("partitioned");
+// cacheCfg.setCacheMode(PARTITIONED);
+// cacheCfg.setNearConfiguration(null);
+// cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+// cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
+// cacheCfg.setBackups(0);
+// cacheCfg.setAtomicityMode(TRANSACTIONAL);
+//
+// CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+//
+// metaCacheCfg.setName("replicated1");
+// metaCacheCfg.setCacheMode(REPLICATED);
+// metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+// metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+//
+// IgniteConfiguration cfg = new IgniteConfiguration();
+//
+// cfg.setGridName("igfs-grid-secondary");
+//
+// TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+//
+// discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+//
+// cfg.setDiscoverySpi(discoSpi);
+// cfg.setCacheConfiguration(metaCacheCfg, cacheCfg);
+// cfg.setFileSystemConfiguration(igfsCfg);
+//
+// cfg.setLocalHost("127.0.0.1");
+//
+// G.start(cfg);
+// }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+// startUpSecondary();
+
+ super.beforeTest();
+ }
+
+ /**
+ * @return IGFS configuration.
+ */
+ @Override public FileSystemConfiguration igfsConfiguration() {
+
+ FileSystemConfiguration fsCfg = super.igfsConfiguration();
+//
+// fsCfg.setName("igfs-secondary");
+// fsCfg.setDefaultMode(PRIMARY);
+//
+// IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
+//
+// endpointCfg.setType(IgfsIpcEndpointType.TCP);
+// endpointCfg.setPort(11500);
+//
+// fsCfg.setIpcEndpointConfiguration(endpointCfg);
+//
+// try {
+//
+// fsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
+// "igfs://igfs-secondary:igfs-grid-secondary@127.0.0.1:11500/",
+// "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
+// }
+// catch (Exception e) {
+// throw new IgniteException(e);
+// }
+
+ return fsCfg;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56cb251f/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 149decc..179f7f0 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -48,7 +48,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
downloadHadoop();
downloadHive();
- ClassLoader ldr = new HadoopClassLoader(null, "test"); // TestSuite.class.getClassLoader();
+ HadoopClassLoader ldr = new HadoopClassLoader(null, "test");
TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite");