You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/26 07:17:14 UTC
[18/46] ignite git commit: IGNITE-3953: Hadoop: Merged back both
modules.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
new file mode 100644
index 0000000..3731b2c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Splitter;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.FsCacheKey;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
+
+/**
+ * Hadoop job implementation for v2 API.
+ */
+public class HadoopV2Job implements HadoopJob {
+ /** */
+ private final JobConf jobConf;
+
+ /** */
+ private final JobContextImpl jobCtx;
+
+ /** */
+ private final HadoopHelper helper;
+
+ /** Hadoop job ID. */
+ private final HadoopJobId jobId;
+
+ /** Job info. */
+ protected final HadoopJobInfo jobInfo;
+
+ /** Native library names. */
+ private final String[] libNames;
+
+ /** */
+ private final JobID hadoopJobID;
+
+ /** */
+ private final HadoopV2JobResourceManager rsrcMgr;
+
+ /** */
+ private final ConcurrentMap<T2<HadoopTaskType, Integer>, GridFutureAdapter<HadoopTaskContext>> ctxs =
+ new ConcurrentHashMap8<>();
+
+ /** Pooling task context class and thus class loading environment. */
+ private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
+
+ /** All created contexts. */
+ private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = new ConcurrentLinkedDeque<>();
+
+ /** File system cache map. */
+ private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap();
+
+ /** Local node ID */
+ private volatile UUID locNodeId;
+
+ /** Serialized JobConf. */
+ private volatile byte[] jobConfData;
+
+ /**
+ * Constructor.
+ *
+ * @param jobId Job ID.
+ * @param jobInfo Job info.
+ * @param log Logger.
+ * @param libNames Optional additional native library names.
+ */
+ public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log,
+ @Nullable String[] libNames, HadoopHelper helper) {
+ assert jobId != null;
+ assert jobInfo != null;
+
+ this.jobId = jobId;
+ this.jobInfo = jobInfo;
+ this.libNames = libNames;
+ this.helper = helper;
+
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
+
+ jobConf = new JobConf();
+
+ HadoopFileSystemsUtils.setupFileSystems(jobConf);
+
+ for (Map.Entry<String,String> e : jobInfo.properties().entrySet())
+ jobConf.set(e.getKey(), e.getValue());
+
+ jobCtx = new JobContextImpl(jobConf, hadoopJobID);
+
+ rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this);
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobId id() {
+ return jobId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobInfo info() {
+ return jobInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf.getClassLoader());
+
+ try {
+ String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
+
+ if (jobDirPath == null) { // Probably job was submitted not by hadoop client.
+ // Assume that we have needed classes and try to generate input splits ourself.
+ if (jobConf.getUseNewMapper())
+ return HadoopV2Splitter.splitJob(jobCtx);
+ else
+ return HadoopV1Splitter.splitJob(jobConf);
+ }
+
+ Path jobDir = new Path(jobDirPath);
+
+ try {
+ FileSystem fs = fileSystem(jobDir.toUri(), jobConf);
+
+ JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
+ jobDir);
+
+ if (F.isEmpty(metaInfos))
+ throw new IgniteCheckedException("No input splits found.");
+
+ Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir);
+
+ try (FSDataInputStream in = fs.open(splitsFile)) {
+ Collection<HadoopInputSplit> res = new ArrayList<>(metaInfos.length);
+
+ for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) {
+ long off = metaInfo.getStartOffset();
+
+ String[] hosts = metaInfo.getLocations();
+
+ in.seek(off);
+
+ String clsName = Text.readString(in);
+
+ HadoopFileBlock block = HadoopV1Splitter.readFileBlock(clsName, in, hosts);
+
+ if (block == null)
+ block = HadoopV2Splitter.readFileBlock(clsName, in, hosts);
+
+ res.add(block != null ? block : new HadoopExternalSplit(hosts, off));
+ }
+
+ return res;
+ }
+ }
+ catch (Throwable e) {
+ if (e instanceof Error)
+ throw (Error)e;
+ else
+ throw transformException(e);
+ }
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked", "MismatchedQueryAndUpdateOfCollection" })
+ @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException {
+ T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber());
+
+ GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId);
+
+ if (fut != null)
+ return fut.get();
+
+ GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>());
+
+ if (old != null)
+ return old.get();
+
+ Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
+
+ try {
+ if (cls == null) {
+ // If there is no pooled class, then load new one.
+ // Note that the classloader identified by the task it was initially created for,
+ // but later it may be reused for other tasks.
+ HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
+ HadoopClassLoader.nameForTask(info, false), libNames, helper);
+
+ cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
+
+ fullCtxClsQueue.add(cls);
+ }
+
+ Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class,
+ HadoopJobId.class, UUID.class, DataInput.class);
+
+ if (jobConfData == null)
+ synchronized(jobConf) {
+ if (jobConfData == null) {
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ jobConf.write(new DataOutputStream(buf));
+
+ jobConfData = buf.toByteArray();
+ }
+ }
+
+ HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId,
+ new DataInputStream(new ByteArrayInputStream(jobConfData)));
+
+ fut.onDone(res);
+
+ return res;
+ }
+ catch (Throwable e) {
+ IgniteCheckedException te = transformException(e);
+
+ fut.onDone(te);
+
+ if (e instanceof Error)
+ throw (Error)e;
+
+ throw te;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException {
+ assert locNodeId != null;
+
+ this.locNodeId = locNodeId;
+
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId));
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ @Override public void dispose(boolean external) throws IgniteCheckedException {
+ try {
+ if (rsrcMgr != null && !external) {
+ File jobLocDir = jobLocalDir(locNodeId, jobId);
+
+ if (jobLocDir.exists())
+ U.delete(jobLocDir);
+ }
+ }
+ finally {
+ taskCtxClsPool.clear();
+
+ Throwable err = null;
+
+ // Stop the daemon threads that have been created
+ // with the task class loaders:
+ while (true) {
+ Class<? extends HadoopTaskContext> cls = fullCtxClsQueue.poll();
+
+ if (cls == null)
+ break;
+
+ try {
+ final ClassLoader ldr = cls.getClassLoader();
+
+ try {
+ // Stop Hadoop daemons for this *task*:
+ stopHadoopFsDaemons(ldr);
+ }
+ catch (Exception e) {
+ if (err == null)
+ err = e;
+ }
+
+ // Also close all the FileSystems cached in
+ // HadoopLazyConcurrentMap for this *task* class loader:
+ closeCachedTaskFileSystems(ldr);
+ }
+ catch (Throwable e) {
+ if (err == null)
+ err = e;
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+
+ assert fullCtxClsQueue.isEmpty();
+
+ try {
+ // Close all cached file systems for this *Job*:
+ fsMap.close();
+ }
+ catch (Exception e) {
+ if (err == null)
+ err = e;
+ }
+
+ if (err != null)
+ throw U.cast(err);
+ }
+ }
+
+ /**
+ * Stops Hadoop Fs daemon threads.
+ * @param ldr The task ClassLoader to stop the daemons for.
+ * @throws Exception On error.
+ */
+ private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception {
+ Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.CLS_DAEMON);
+
+ Method m = daemonCls.getMethod("dequeueAndStopAll");
+
+ m.invoke(null);
+ }
+
+ /**
+ * Closes all the file systems user by task
+ * @param ldr The task class loader.
+ * @throws Exception On error.
+ */
+ private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception {
+ Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName());
+
+ Method m = clazz.getMethod("close");
+
+ m.invoke(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
+ rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
+ HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get();
+
+ taskCtxClsPool.add(ctx.getClass());
+
+ File locDir = taskLocalDir(locNodeId, info);
+
+ if (locDir.exists())
+ U.delete(locDir);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupStagingDirectory() {
+ rsrcMgr.cleanupStagingDirectory();
+ }
+
+ /**
+ * Getter for job configuration.
+ * @return The job configuration.
+ */
+ public JobConf jobConf() {
+ return jobConf;
+ }
+
+ /**
+ * Gets file system for this job.
+ * @param uri The uri.
+ * @param cfg The configuration.
+ * @return The file system.
+ * @throws IOException On error.
+ */
+ public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException {
+ return fileSystemForMrUserWithCaching(uri, cfg, fsMap);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
new file mode 100644
index 0000000..3984f83
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.util.RunJar;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.FileSystemException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+
+/**
+ * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional
+ * files are needed to be placed on local files system.
+ */
+class HadoopV2JobResourceManager {
+ /** File type Fs disable caching property name. */
+ private static final String FILE_DISABLE_CACHING_PROPERTY_NAME =
+ HadoopFileSystemsUtils.disableFsCachePropertyName("file");
+
+ /** Hadoop job context. */
+ private final JobContextImpl ctx;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Job ID. */
+ private final HadoopJobId jobId;
+
+ /** Class path list. */
+ private URL[] clsPath;
+
+ /** Set of local resources. */
+ private final Collection<File> rsrcSet = new HashSet<>();
+
+ /** Staging directory to delivery job jar and config to the work nodes. */
+ private Path stagingDir;
+
+ /** The job. */
+ private final HadoopV2Job job;
+
+ /**
+ * Creates new instance.
+ * @param jobId Job ID.
+ * @param ctx Hadoop job context.
+ * @param log Logger.
+ */
+ public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job job) {
+ this.jobId = jobId;
+ this.ctx = ctx;
+ this.log = log.getLogger(HadoopV2JobResourceManager.class);
+ this.job = job;
+ }
+
+ /**
+ * Set working directory in local file system.
+ *
+ * @param dir Working directory.
+ * @throws IOException If fails.
+ */
+ private void setLocalFSWorkingDirectory(File dir) throws IOException {
+ JobConf cfg = ctx.getJobConf();
+
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(cfg.getClassLoader());
+
+ try {
+ cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
+
+ if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
+ FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /**
+ * Prepare job resources. Resolve the classpath list and download it if needed.
+ *
+ * @param download {@code true} If need to download resources.
+ * @param jobLocDir Work directory for the job.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void prepareJobEnvironment(boolean download, File jobLocDir) throws IgniteCheckedException {
+ try {
+ if (jobLocDir.exists())
+ throw new IgniteCheckedException("Local job directory already exists: " + jobLocDir.getAbsolutePath());
+
+ JobConf cfg = ctx.getJobConf();
+
+ String mrDir = cfg.get("mapreduce.job.dir");
+
+ if (mrDir != null) {
+ stagingDir = new Path(new URI(mrDir));
+
+ if (download) {
+ FileSystem fs = job.fileSystem(stagingDir.toUri(), cfg);
+
+ if (!fs.exists(stagingDir))
+ throw new IgniteCheckedException("Failed to find map-reduce submission " +
+ "directory (does not exist): " + stagingDir);
+
+ if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
+ throw new IgniteCheckedException("Failed to copy job submission directory "
+ + "contents to local file system "
+ + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+ + ", jobId=" + jobId + ']');
+ }
+
+ File jarJobFile = new File(jobLocDir, "job.jar");
+
+ Collection<URL> clsPathUrls = new ArrayList<>();
+
+ clsPathUrls.add(jarJobFile.toURI().toURL());
+
+ rsrcSet.add(jarJobFile);
+ rsrcSet.add(new File(jobLocDir, "job.xml"));
+
+ processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES);
+ processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES);
+ processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null);
+ processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null);
+
+ if (!clsPathUrls.isEmpty()) {
+ clsPath = new URL[clsPathUrls.size()];
+
+ clsPathUrls.toArray(clsPath);
+ }
+ }
+ else if (!jobLocDir.mkdirs())
+ throw new IgniteCheckedException("Failed to create local job directory: "
+ + jobLocDir.getAbsolutePath());
+
+ setLocalFSWorkingDirectory(jobLocDir);
+ }
+ catch (URISyntaxException | IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Process list of resources.
+ *
+ * @param jobLocDir Job working directory.
+ * @param files Array of {@link URI} or {@link org.apache.hadoop.fs.Path} to process resources.
+ * @param download {@code true}, if need to download. Process class path only else.
+ * @param extract {@code true}, if need to extract archive.
+ * @param clsPathUrls Collection to add resource as classpath resource.
+ * @param rsrcNameProp Property for resource name array setting.
+ * @throws IOException If failed.
+ */
+ private void processFiles(File jobLocDir, @Nullable Object[] files, boolean download, boolean extract,
+ @Nullable Collection<URL> clsPathUrls, @Nullable String rsrcNameProp) throws IOException {
+ if (F.isEmptyOrNulls(files))
+ return;
+
+ Collection<String> res = new ArrayList<>();
+
+ for (Object pathObj : files) {
+ Path srcPath;
+
+ if (pathObj instanceof URI) {
+ URI uri = (URI)pathObj;
+
+ srcPath = new Path(uri);
+ }
+ else
+ srcPath = (Path)pathObj;
+
+ String locName = srcPath.getName();
+
+ File dstPath = new File(jobLocDir.getAbsolutePath(), locName);
+
+ res.add(locName);
+
+ rsrcSet.add(dstPath);
+
+ if (clsPathUrls != null)
+ clsPathUrls.add(dstPath.toURI().toURL());
+
+ if (!download)
+ continue;
+
+ JobConf cfg = ctx.getJobConf();
+
+ FileSystem dstFs = FileSystem.getLocal(cfg);
+
+ FileSystem srcFs = job.fileSystem(srcPath.toUri(), cfg);
+
+ if (extract) {
+ File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
+
+ if (!archivesPath.exists() && !archivesPath.mkdir())
+ throw new IOException("Failed to create directory " +
+ "[path=" + archivesPath + ", jobId=" + jobId + ']');
+
+ File archiveFile = new File(archivesPath, locName);
+
+ FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg);
+
+ String archiveNameLC = archiveFile.getName().toLowerCase();
+
+ if (archiveNameLC.endsWith(".jar"))
+ RunJar.unJar(archiveFile, dstPath);
+ else if (archiveNameLC.endsWith(".zip"))
+ FileUtil.unZip(archiveFile, dstPath);
+ else if (archiveNameLC.endsWith(".tar.gz") ||
+ archiveNameLC.endsWith(".tgz") ||
+ archiveNameLC.endsWith(".tar"))
+ FileUtil.unTar(archiveFile, dstPath);
+ else
+ throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']');
+ }
+ else
+ FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg);
+ }
+
+ if (!res.isEmpty() && rsrcNameProp != null)
+ ctx.getJobConf().setStrings(rsrcNameProp, res.toArray(new String[res.size()]));
+ }
+
+ /**
+ * Prepares working directory for the task.
+ *
+ * <ul>
+ * <li>Creates working directory.</li>
+ * <li>Creates symbolic links to all job resources in working directory.</li>
+ * </ul>
+ *
+ * @param path Path to working directory of the task.
+ * @throws IgniteCheckedException If fails.
+ */
+ public void prepareTaskWorkDir(File path) throws IgniteCheckedException {
+ try {
+ if (path.exists())
+ throw new IOException("Task local directory already exists: " + path);
+
+ if (!path.mkdir())
+ throw new IOException("Failed to create directory: " + path);
+
+ for (File resource : rsrcSet) {
+ File symLink = new File(path, resource.getName());
+
+ try {
+ Files.createSymbolicLink(symLink.toPath(), resource.toPath());
+ }
+ catch (IOException e) {
+ String msg = "Unable to create symlink \"" + symLink + "\" to \"" + resource + "\".";
+
+ if (U.isWindows() && e instanceof FileSystemException)
+ msg += "\n\nAbility to create symbolic links is required!\n" +
+ "On Windows platform you have to grant permission 'Create symbolic links'\n" +
+ "to your user or run the Accelerator as Administrator.\n";
+
+ throw new IOException(msg, e);
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Unable to prepare local working directory for the task " +
+ "[jobId=" + jobId + ", path=" + path+ ']', e);
+ }
+ }
+
+ /**
+ * Cleans up job staging directory.
+ */
+ public void cleanupStagingDirectory() {
+ try {
+ if (stagingDir != null) {
+ FileSystem fs = job.fileSystem(stagingDir.toUri(), ctx.getJobConf());
+
+ fs.delete(stagingDir, true);
+ }
+ }
+ catch (Exception e) {
+ log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);
+ }
+ }
+
+ /**
+ * Returns array of class path for current job.
+ *
+ * @return Class path collection.
+ */
+ @Nullable public URL[] classPath() {
+ return clsPath;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
new file mode 100644
index 0000000..418df4e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+
+/**
+ * Hadoop map task implementation for v2 API.
+ */
+public class HadoopV2MapTask extends HadoopV2Task {
+ /**
+ * @param taskInfo Task info.
+ */
+ public HadoopV2MapTask(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"ConstantConditions", "unchecked"})
+ @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException {
+ OutputFormat outputFormat = null;
+ Exception err = null;
+
+ JobContextImpl jobCtx = taskCtx.jobContext();
+
+ try {
+ InputSplit nativeSplit = hadoopContext().getInputSplit();
+
+ if (nativeSplit == null)
+ throw new IgniteCheckedException("Input split cannot be null.");
+
+ InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
+ hadoopContext().getConfiguration());
+
+ RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext());
+
+ reader.initialize(nativeSplit, hadoopContext());
+
+ hadoopContext().reader(reader);
+
+ HadoopJobInfo jobInfo = taskCtx.job().info();
+
+ outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx);
+
+ Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration());
+
+ try {
+ mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+ }
+ finally {
+ closeWriter();
+ }
+
+ commit(outputFormat);
+ }
+ catch (InterruptedException e) {
+ err = e;
+
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ catch (Exception e) {
+ err = e;
+
+ throw new IgniteCheckedException(e);
+ }
+ finally {
+ if (err != null)
+ abort(outputFormat);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Partitioner.java
new file mode 100644
index 0000000..5a82dcf
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Partitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+
+/**
+ * Hadoop partitioner adapter for v2 API.
+ */
+public class HadoopV2Partitioner implements HadoopPartitioner {
+ /** Partitioner instance. */
+ private Partitioner<Object, Object> part;
+
+ /**
+ * @param cls Hadoop partitioner class.
+ * @param conf Job configuration.
+ */
+ public HadoopV2Partitioner(Class<? extends Partitioner<?, ?>> cls, Configuration conf) {
+ part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key, Object val, int parts) {
+ return part.getPartition(key, val, parts);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
new file mode 100644
index 0000000..930ec1d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+
+/**
+ * Hadoop reduce task implementation for v2 API.
+ */
+public class HadoopV2ReduceTask extends HadoopV2Task {
+ /** {@code True} if reduce, {@code false} if combine. */
+ private final boolean reduce;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ * @param reduce {@code True} if reduce, {@code false} if combine.
+ */
+ public HadoopV2ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
+ super(taskInfo);
+
+ this.reduce = reduce;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"ConstantConditions", "unchecked"})
+ @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException {
+ OutputFormat outputFormat = null;
+ Exception err = null;
+
+ JobContextImpl jobCtx = taskCtx.jobContext();
+
+ try {
+ outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null;
+
+ Reducer reducer;
+ if (reduce) reducer = ReflectionUtils.newInstance(jobCtx.getReducerClass(),
+ jobCtx.getConfiguration());
+ else reducer = ReflectionUtils.newInstance(jobCtx.getCombinerClass(),
+ jobCtx.getConfiguration());
+
+ try {
+ reducer.run(new WrappedReducer().getReducerContext(hadoopContext()));
+ }
+ finally {
+ closeWriter();
+ }
+
+ commit(outputFormat);
+ }
+ catch (InterruptedException e) {
+ err = e;
+
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ catch (Exception e) {
+ err = e;
+
+ throw new IgniteCheckedException(e);
+ }
+ finally {
+ if (err != null)
+ abort(outputFormat);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2SetupTask.java
new file mode 100644
index 0000000..b466019
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2SetupTask.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+
+import java.io.IOException;
+
+/**
+ * Hadoop setup task (prepares job).
+ */
+public class HadoopV2SetupTask extends HadoopV2Task {
+ /**
+ * Constructor.
+ *
+ * @param taskInfo task info.
+ */
+ public HadoopV2SetupTask(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
+ @Override protected void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException {
+ try {
+ JobContextImpl jobCtx = taskCtx.jobContext();
+
+ OutputFormat outputFormat = getOutputFormat(jobCtx);
+
+ outputFormat.checkOutputSpecs(jobCtx);
+
+ OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext());
+
+ if (committer != null)
+ committer.setupJob(jobCtx);
+ }
+ catch (ClassNotFoundException | IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
new file mode 100644
index 0000000..667ef1e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Hadoop API v2 splitter.
+ */
+public class HadoopV2Splitter {
+ /** */
+ private static final String[] EMPTY_HOSTS = {};
+
+ /**
+ * @param ctx Job context.
+ * @return Collection of mapped splits.
+ * @throws IgniteCheckedException If mapping failed.
+ */
+ public static Collection<HadoopInputSplit> splitJob(JobContext ctx) throws IgniteCheckedException {
+ try {
+ InputFormat<?, ?> format = ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration());
+
+ assert format != null;
+
+ List<InputSplit> splits = format.getSplits(ctx);
+
+ Collection<HadoopInputSplit> res = new ArrayList<>(splits.size());
+
+ int id = 0;
+
+ for (InputSplit nativeSplit : splits) {
+ if (nativeSplit instanceof FileSplit) {
+ FileSplit s = (FileSplit)nativeSplit;
+
+ res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
+ }
+ else
+ res.add(HadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations()));
+
+ id++;
+ }
+
+ return res;
+ }
+ catch (IOException | ClassNotFoundException e) {
+ throw new IgniteCheckedException(e);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
+ }
+
+ /**
+ * @param clsName Input split class name.
+ * @param in Input stream.
+ * @param hosts Optional hosts.
+ * @return File block or {@code null} if it is not a {@link FileSplit} instance.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static HadoopFileBlock readFileBlock(String clsName, DataInput in, @Nullable String[] hosts)
+ throws IgniteCheckedException {
+ if (!FileSplit.class.getName().equals(clsName))
+ return null;
+
+ FileSplit split = new FileSplit();
+
+ try {
+ split.readFields(in);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ if (hosts == null)
+ hosts = EMPTY_HOSTS;
+
+ return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Task.java
new file mode 100644
index 0000000..ee8bd98
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Task.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Extended Hadoop v2 task.
+ */
+public abstract class HadoopV2Task extends HadoopTask {
+ /** Hadoop context. */
+ private HadoopV2Context hadoopCtx;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ */
+ protected HadoopV2Task(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ hadoopCtx = new HadoopV2Context(ctx);
+
+ run0(ctx);
+ }
+
+ /**
+ * Internal task routine.
+ *
+ * @param taskCtx Task context.
+ * @throws IgniteCheckedException
+ */
+ protected abstract void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException;
+
+ /**
+ * @return hadoop context.
+ */
+ protected HadoopV2Context hadoopContext() {
+ return hadoopCtx;
+ }
+
+ /**
+ * Create and configure an OutputFormat instance.
+ *
+ * @param jobCtx Job context.
+ * @return Instance of OutputFormat is specified in job configuration.
+ * @throws ClassNotFoundException If specified class not found.
+ */
+ protected OutputFormat getOutputFormat(JobContext jobCtx) throws ClassNotFoundException {
+ return ReflectionUtils.newInstance(jobCtx.getOutputFormatClass(), hadoopContext().getConfiguration());
+ }
+
+ /**
+ * Put write into Hadoop context and return associated output format instance.
+ *
+ * @param jobCtx Job context.
+ * @return Output format.
+ * @throws IgniteCheckedException In case of Grid exception.
+ * @throws InterruptedException In case of interrupt.
+ */
+ protected OutputFormat prepareWriter(JobContext jobCtx)
+ throws IgniteCheckedException, InterruptedException {
+ try {
+ OutputFormat outputFormat = getOutputFormat(jobCtx);
+
+ assert outputFormat != null;
+
+ OutputCommitter outCommitter = outputFormat.getOutputCommitter(hadoopCtx);
+
+ if (outCommitter != null)
+ outCommitter.setupTask(hadoopCtx);
+
+ RecordWriter writer = outputFormat.getRecordWriter(hadoopCtx);
+
+ hadoopCtx.writer(writer);
+
+ return outputFormat;
+ }
+ catch (IOException | ClassNotFoundException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Closes writer.
+ *
+ * @throws Exception If fails and logger hasn't been specified.
+ */
+ protected void closeWriter() throws Exception {
+ RecordWriter writer = hadoopCtx.writer();
+
+ if (writer != null)
+ writer.close(hadoopCtx);
+ }
+
+ /**
+ * Setup task.
+ *
+ * @param outputFormat Output format.
+ * @throws IOException In case of IO exception.
+ * @throws InterruptedException In case of interrupt.
+ */
+ protected void setup(@Nullable OutputFormat outputFormat) throws IOException, InterruptedException {
+ if (hadoopCtx.writer() != null) {
+ assert outputFormat != null;
+
+ outputFormat.getOutputCommitter(hadoopCtx).setupTask(hadoopCtx);
+ }
+ }
+
+ /**
+ * Commit task.
+ *
+ * @param outputFormat Output format.
+ * @throws IgniteCheckedException In case of Grid exception.
+ * @throws IOException In case of IO exception.
+ * @throws InterruptedException In case of interrupt.
+ */
+ protected void commit(@Nullable OutputFormat outputFormat) throws IgniteCheckedException, IOException, InterruptedException {
+ if (hadoopCtx.writer() != null) {
+ assert outputFormat != null;
+
+ OutputCommitter outputCommitter = outputFormat.getOutputCommitter(hadoopCtx);
+
+ if (outputCommitter.needsTaskCommit(hadoopCtx))
+ outputCommitter.commitTask(hadoopCtx);
+ }
+ }
+
+ /**
+ * Abort task.
+ *
+ * @param outputFormat Output format.
+ */
+ protected void abort(@Nullable OutputFormat outputFormat) {
+ if (hadoopCtx.writer() != null) {
+ assert outputFormat != null;
+
+ try {
+ outputFormat.getOutputCommitter(hadoopCtx).abortTask(hadoopCtx);
+ }
+ catch (IOException ignore) {
+ // Ignore.
+ }
+ catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ hadoopCtx.cancel();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
new file mode 100644
index 0000000..96fa892
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Partitioner;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1ReduceTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1SetupTask;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.unwrapSplit;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.FsCacheKey;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+
+/**
+ * Context for task execution.
+ */
+public class HadoopV2TaskContext extends HadoopTaskContext {
+ /** */
+ private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
+
+ /** Lazy per-user file system cache used by the Hadoop task. */
+ private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
+ = createHadoopLazyConcurrentMap();
+
+ /**
+ * This method is called with reflection upon Job finish with class loader of each task.
+ * This will clean up all the Fs created for specific task.
+ * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders
+ * are different.
+ *
+ * @throws IgniteCheckedException On error.
+ */
+ public static void close() throws IgniteCheckedException {
+ fsMap.close();
+ }
+
+ /**
+ * Check for combiner grouping support (available since Hadoop 2.3).
+ */
+ static {
+ boolean ok;
+
+ try {
+ JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
+
+ ok = true;
+ }
+ catch (NoSuchMethodException ignore) {
+ ok = false;
+ }
+
+ COMBINE_KEY_GROUPING_SUPPORTED = ok;
+ }
+
+ /** Flag is set if new context-object code is used for running the mapper. */
+ private final boolean useNewMapper;
+
+ /** Flag is set if new context-object code is used for running the reducer. */
+ private final boolean useNewReducer;
+
+ /** Flag is set if new context-object code is used for running the combiner. */
+ private final boolean useNewCombiner;
+
+ /** */
+ private final JobContextImpl jobCtx;
+
+ /** Set if task is to cancelling. */
+ private volatile boolean cancelled;
+
+ /** Current task. */
+ private volatile HadoopTask task;
+
+ /** Local node ID */
+ private final UUID locNodeId;
+
+ /** Counters for task. */
+ private final HadoopCounters cntrs = new HadoopCountersImpl();
+
+ /**
+ * @param taskInfo Task info.
+ * @param job Job.
+ * @param jobId Job ID.
+ * @param locNodeId Local node ID.
+ * @param jobConfDataInput DataInput for read JobConf.
+ */
+ public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId,
+ @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
+ super(taskInfo, job);
+ this.locNodeId = locNodeId;
+
+ // Before create JobConf instance we should set new context class loader.
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ JobConf jobConf = new JobConf();
+
+ try {
+ jobConf.readFields(jobConfDataInput);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ // For map-reduce jobs prefer local writes.
+ jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
+
+ jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
+
+ useNewMapper = jobConf.getUseNewMapper();
+ useNewReducer = jobConf.getUseNewReducer();
+ useNewCombiner = jobConf.getCombinerClass() == null;
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+ return cntrs.counter(grp, name, cls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopCounters counters() {
+ return cntrs;
+ }
+
+ /**
+ * Creates appropriate task from current task info.
+ *
+ * @return Task.
+ */
+ private HadoopTask createTask() {
+ boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
+
+ switch (taskInfo().type()) {
+ case SETUP:
+ return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo());
+
+ case MAP:
+ return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo());
+
+ case REDUCE:
+ return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) :
+ new HadoopV1ReduceTask(taskInfo(), true);
+
+ case COMBINE:
+ return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) :
+ new HadoopV1ReduceTask(taskInfo(), false);
+
+ case COMMIT:
+ case ABORT:
+ return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) :
+ new HadoopV1CleanupTask(taskInfo(), isAbort);
+
+ default:
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() throws IgniteCheckedException {
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
+
+ try {
+ try {
+ task = createTask();
+ }
+ catch (Throwable e) {
+ if (e instanceof Error)
+ throw e;
+
+ throw transformException(e);
+ }
+
+ if (cancelled)
+ throw new HadoopTaskCancelledException("Task cancelled.");
+
+ try {
+ task.run(this);
+ }
+ catch (Throwable e) {
+ if (e instanceof Error)
+ throw e;
+
+ throw transformException(e);
+ }
+ }
+ finally {
+ task = null;
+
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ cancelled = true;
+
+ HadoopTask t = task;
+
+ if (t != null)
+ t.cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+ File locDir;
+
+ switch(taskInfo().type()) {
+ case MAP:
+ case REDUCE:
+ job().prepareTaskEnvironment(taskInfo());
+
+ locDir = taskLocalDir(locNodeId, taskInfo());
+
+ break;
+
+ default:
+ locDir = jobLocalDir(locNodeId, taskInfo().jobId());
+ }
+
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
+
+ try {
+ FileSystem.get(jobConf());
+
+ LocalFileSystem locFs = FileSystem.getLocal(jobConf());
+
+ locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
+ }
+ catch (Throwable e) {
+ if (e instanceof Error)
+ throw (Error)e;
+
+ throw transformException(e);
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+ job().cleanupTaskEnvironment(taskInfo());
+ }
+
+ /**
+ * Creates Hadoop attempt ID.
+ *
+ * @return Attempt ID.
+ */
+ public TaskAttemptID attemptId() {
+ TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
+
+ return new TaskAttemptID(tid, taskInfo().attempt());
+ }
+
+ /**
+ * @param type Task type.
+ * @return Hadoop task type.
+ */
+ private TaskType taskType(HadoopTaskType type) {
+ switch (type) {
+ case SETUP:
+ return TaskType.JOB_SETUP;
+ case MAP:
+ case COMBINE:
+ return TaskType.MAP;
+
+ case REDUCE:
+ return TaskType.REDUCE;
+
+ case COMMIT:
+ case ABORT:
+ return TaskType.JOB_CLEANUP;
+
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Gets job configuration of the task.
+ *
+ * @return Job configuration.
+ */
+ public JobConf jobConf() {
+ return jobCtx.getJobConf();
+ }
+
+ /**
+ * Gets job context of the task.
+ *
+ * @return Job context.
+ */
+ public JobContextImpl jobContext() {
+ return jobCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
+ Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
+
+ if (partClsOld != null)
+ return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
+
+ try {
+ return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
+ }
+ catch (ClassNotFoundException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Gets serializer for specified class.
+ *
+ * @param cls Class.
+ * @param jobConf Job configuration.
+ * @return Appropriate serializer.
+ */
+ @SuppressWarnings("unchecked")
+ private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
+ A.notNull(cls, "cls");
+
+ SerializationFactory factory = new SerializationFactory(jobConf);
+
+ Serialization<?> serialization = factory.getSerialization(cls);
+
+ if (serialization == null)
+ throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
+
+ if (serialization.getClass() == WritableSerialization.class)
+ return new HadoopWritableSerialization((Class<? extends Writable>)cls);
+
+ return new HadoopSerializationWrapper(serialization, cls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
+ return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
+ return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Comparator<Object> sortComparator() {
+ return (Comparator<Object>)jobCtx.getSortComparator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Comparator<Object> groupComparator() {
+ Comparator<?> res;
+
+ switch (taskInfo().type()) {
+ case COMBINE:
+ res = COMBINE_KEY_GROUPING_SUPPORTED ?
+ jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
+
+ break;
+
+ case REDUCE:
+ res = jobContext().getGroupingComparator();
+
+ break;
+
+ default:
+ return null;
+ }
+
+ if (res != null && res.getClass() != sortComparator().getClass())
+ return (Comparator<Object>)res;
+
+ return null;
+ }
+
+ /**
+ * @param split Split.
+ * @return Native Hadoop split.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException {
+ if (split instanceof HadoopExternalSplit)
+ return readExternalSplit((HadoopExternalSplit)split);
+
+ if (split instanceof HadoopSplitWrapper)
+ return unwrapSplit((HadoopSplitWrapper)split);
+
+ throw new IllegalStateException("Unknown split: " + split);
+ }
+
+ /**
+ * @param split External split.
+ * @return Native input split.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
+ Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
+
+ FileSystem fs;
+
+ try {
+ // This assertion uses .startsWith() instead of .equals() because task class loaders may
+ // be reused between tasks of the same job.
+ assert ((HadoopClassLoader)getClass().getClassLoader()).name()
+ .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
+
+ // We also cache Fs there, all them will be cleared explicitly upon the Job end.
+ fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ try (
+ FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
+
+ in.seek(split.offset());
+
+ String clsName = Text.readString(in);
+
+ Class<?> cls = jobConf().getClassByName(clsName);
+
+ assert cls != null;
+
+ Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
+
+ Deserializer deserializer = serialization.getDeserializer(cls);
+
+ deserializer.open(in);
+
+ Object res = deserializer.deserialize(null);
+
+ deserializer.close();
+
+ assert res != null;
+
+ return res;
+ }
+ catch (IOException | ClassNotFoundException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+ String user = job.info().user();
+
+ user = IgfsUtils.fixUserName(user);
+
+ assert user != null;
+
+ String ugiUser;
+
+ try {
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+ assert currUser != null;
+
+ ugiUser = currUser.getShortUserName();
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe);
+ }
+
+ try {
+ if (F.eq(user, ugiUser))
+ // if current UGI context user is the same, do direct call:
+ return c.call();
+ else {
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ @Override public T run() throws Exception {
+ return c.call();
+ }
+ });
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
new file mode 100644
index 0000000..e612f1b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Optimized serialization for Hadoop {@link Writable} types.
+ */
+public class HadoopWritableSerialization implements HadoopSerialization {
+ /** */
+ private final Class<? extends Writable> cls;
+
+ /**
+ * @param cls Class.
+ */
+ public HadoopWritableSerialization(Class<? extends Writable> cls) {
+ assert cls != null;
+
+ this.cls = cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
+ assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass();
+
+ try {
+ ((Writable)obj).write(out);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
+ Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
+
+ try {
+ w.readFields(in);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ return w;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..8d5957b
--- /dev/null
+++ b/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1 @@
+org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
new file mode 100644
index 0000000..fd72821
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.concurrent.ConcurrentMap;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * For tests.
+ */
+public class HadoopSharedMap {
+ /** */
+ private static final ConcurrentMap<String, HadoopSharedMap> maps = new ConcurrentHashMap8<>();
+
+ /** */
+ private final ConcurrentMap<String, Object> map = new ConcurrentHashMap8<>();
+
+ /**
+ * Private.
+ */
+ private HadoopSharedMap() {
+ // No-op.
+ }
+
+ /**
+ * Puts object by key.
+ *
+ * @param key Key.
+ * @param val Value.
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T put(String key, T val) {
+ Object old = map.putIfAbsent(key, val);
+
+ return old == null ? val : (T)old;
+ }
+
+ /**
+ * @param cls Class.
+ * @return Map of static fields.
+ */
+ public static HadoopSharedMap map(Class<?> cls) {
+ HadoopSharedMap m = maps.get(cls.getName());
+
+ if (m != null)
+ return m;
+
+ HadoopSharedMap old = maps.putIfAbsent(cls.getName(), m = new HadoopSharedMap());
+
+ return old == null ? m : old;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestClassLoader.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestClassLoader.java
new file mode 100644
index 0000000..16557ea
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestClassLoader.java
@@ -0,0 +1,89 @@
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Hadoop test class loader aimed to provide better isolation.
+ */
+public class HadoopTestClassLoader extends URLClassLoader {
+ /** Parent class loader. */
+ private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopTestClassLoader.class.getClassLoader();
+
+ /** */
+ private static final Collection<URL> APP_JARS = F.asList(APP_CLS_LDR.getURLs());
+
+ /** All participating URLs. */
+ private static final URL[] URLS;
+
+ static {
+ try {
+ List<URL> res = new ArrayList<>();
+
+ for (URL url : APP_JARS) {
+ String urlStr = url.toString();
+
+ if (urlStr.contains("modules/hadoop/"))
+ res.add(url);
+ }
+
+ res.addAll(HadoopClasspathUtils.classpathForClassLoader());
+
+ X.println(">>> " + HadoopTestClassLoader.class.getSimpleName() + " static paths:");
+
+ for (URL url : res)
+ X.println(">>> \t" + url.toString());
+
+ URLS = res.toArray(new URL[res.size()]);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to initialize class loader JARs.", e);
+ }
+ }
+
+ /**
+ * Constructor.
+ */
+ public HadoopTestClassLoader() {
+ super(URLS, APP_CLS_LDR);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ if (HadoopClassLoader.loadByCurrentClassloader(name)) {
+ try {
+ synchronized (getClassLoadingLock(name)) {
+ // First, check if the class has already been loaded
+ Class c = findLoadedClass(name);
+
+ if (c == null) {
+ long t1 = System.nanoTime();
+
+ c = findClass(name);
+
+ // this is the defining class loader; record the stats
+ sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
+ sun.misc.PerfCounter.getFindClasses().increment();
+ }
+
+ if (resolve)
+ resolveClass(c);
+
+ return c;
+ }
+ }
+ catch (NoClassDefFoundError | ClassNotFoundException e) {
+ throw new IgniteException("Failed to load class by test class loader: " + name, e);
+ }
+ }
+
+ return super.loadClass(name, resolve);
+ }
+}