You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/26 07:17:19 UTC
[23/46] ignite git commit: IGNITE-3953: Hadoop: Merged back both
modules.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java
new file mode 100644
index 0000000..60e62ca
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Ignite Hadoop 1.x <code>FileSystem</code> implementation.
+ */
+package org.apache.ignite.hadoop.fs.v1;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
new file mode 100644
index 0000000..18b8bf9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -0,0 +1,1079 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.fs.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.igfs.common.IgfsLogger;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyInputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProxyOutputStream;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsStreamDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsWrapper;
+import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
+import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
+import org.apache.ignite.internal.processors.igfs.IgfsPaths;
+import org.apache.ignite.internal.processors.igfs.IgfsStatus;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_BATCH_SIZE;
+import static org.apache.ignite.configuration.FileSystemConfiguration.DFLT_IGFS_LOG_DIR;
+import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.getFsHadoopUser;
+import static org.apache.ignite.igfs.IgfsMode.PROXY;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_COLOCATED_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_BATCH_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_DIR;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_LOG_ENABLED;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH;
+import static org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsUtils.parameter;
+import static org.apache.ignite.internal.processors.igfs.IgfsEx.IGFS_SCHEME;
+
+/**
+ * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
+ * {@code IGFS} as Hadoop file system, you should configure this class
+ * in Hadoop's {@code core-site.xml} as follows:
+ * <pre name="code" class="xml">
+ * <property>
+ * <name>fs.default.name</name>
+ * <value>igfs://ipc</value>
+ * </property>
+ *
+ * <property>
+ * <name>fs.igfs.impl</name>
+ * <value>org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem</value>
+ * </property>
+ * </pre>
+ * You should also add Ignite JAR and all libraries to Hadoop classpath. To
+ * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop
+ * distribution:
+ * <pre name="code" class="bash">
+ * export IGNITE_HOME=/path/to/Ignite/distribution
+ * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
+ *
+ * for f in $IGNITE_HOME/libs/*.jar; do
+ * export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
+ * done
+ * </pre>
+ * <h1 class="header">Data vs Clients Nodes</h1>
+ * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on
+ * data nodes. Client nodes are responsible for basic file system operations as well as
+ * accessing data nodes remotely. Usually, client nodes are started together
+ * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually
+ * started together with Hadoop {@code task-tracker} processes.
+ * <p>
+ * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml}
+ * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation.
+ */
+public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closeable {
+ /** Logger. */
+ private static final Log LOG = LogFactory.getLog(IgniteHadoopFileSystem.class);
+
+ /** Ensures that close routine is invoked at most once. */
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ /** Grid remote client. */
+ private HadoopIgfsWrapper rmtClient;
+
+ /** The name of the user this File System created on behalf of. */
+ private final String user;
+
+ /** Working directory. */
+ private IgfsPath workingDir;
+
+ /** URI. */
+ private final URI uri;
+
+ /** Authority. */
+ private String uriAuthority;
+
+ /** Client logger. */
+ private IgfsLogger clientLog;
+
+ /** Server block size. */
+ private long grpBlockSize;
+
+ /** Default replication factor. */
+ private short dfltReplication;
+
+ /** Secondary URI string. */
+ private URI secondaryUri;
+
+ /** Mode resolver. */
+ private IgfsModeResolver modeRslvr;
+
+ /** The secondary file system factory. */
+ private HadoopFileSystemFactoryDelegate factory;
+
+ /** Whether custom sequential reads before prefetch value is provided. */
+ private boolean seqReadsBeforePrefetchOverride;
+
+ /** Custom-provided sequential reads before prefetch. */
+ private int seqReadsBeforePrefetch;
+
+ /** Flag that controls whether file writes should be colocated on data node. */
+ private boolean colocateFileWrites;
+
+ /** Prefer local writes. */
+ private boolean preferLocFileWrites;
+
+ /**
+ * @param name URI for file system.
+ * @param cfg Configuration.
+ * @throws URISyntaxException if name has invalid syntax.
+ * @throws IOException If initialization failed.
+ */
+ public IgniteHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException {
+ super(HadoopIgfsEndpoint.normalize(name), IGFS_SCHEME, false, -1);
+
+ uri = name;
+
+ user = getFsHadoopUser();
+
+ try {
+ initialize(name, cfg);
+ }
+ catch (IOException e) {
+ // Close client if exception occurred.
+ if (rmtClient != null)
+ rmtClient.close(false);
+
+ throw e;
+ }
+
+ workingDir = new IgfsPath("/user/" + user);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkPath(Path path) {
+ URI uri = path.toUri();
+
+ if (uri.isAbsolute()) {
+ if (!F.eq(uri.getScheme(), IGFS_SCHEME))
+ throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" +
+ uri.getAuthority() + ']');
+
+ if (!F.eq(uri.getAuthority(), uriAuthority))
+ throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" +
+ uri.getAuthority() + ']');
+ }
+ }
+
+ /**
+ * Public setter that can be used by direct users of FS or Visor.
+ *
+ * @param colocateFileWrites Whether all ongoing file writes should be colocated.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void colocateFileWrites(boolean colocateFileWrites) {
+ this.colocateFileWrites = colocateFileWrites;
+ }
+
+ /**
+ * Enter busy state.
+ *
+ * @throws IOException If file system is stopped.
+ */
+ private void enterBusy() throws IOException {
+ if (closeGuard.get())
+ throw new IOException("File system is stopped.");
+ }
+
+ /**
+ * Leave busy state.
+ */
+ private void leaveBusy() {
+ // No-op.
+ }
+
+ /**
+ * @param name URI passed to constructor.
+ * @param cfg Configuration passed to constructor.
+ * @throws IOException If initialization failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private void initialize(URI name, Configuration cfg) throws IOException {
+ enterBusy();
+
+ try {
+ if (rmtClient != null)
+ throw new IOException("File system is already initialized: " + rmtClient);
+
+ A.notNull(name, "name");
+ A.notNull(cfg, "cfg");
+
+ if (!IGFS_SCHEME.equals(name.getScheme()))
+ throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME +
+ "://[name]/[optional_path], actual=" + name + ']');
+
+ uriAuthority = name.getAuthority();
+
+ // Override sequential reads before prefetch if needed.
+ seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
+
+ if (seqReadsBeforePrefetch > 0)
+ seqReadsBeforePrefetchOverride = true;
+
+ // In Ignite replication factor is controlled by data cache affinity.
+ // We use replication factor to force the whole file to be stored on local node.
+ dfltReplication = (short)cfg.getInt("dfs.replication", 3);
+
+ // Get file colocation control flag.
+ colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false);
+ preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false);
+
+ // Get log directory.
+ String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR);
+
+ File logDirFile = U.resolveIgnitePath(logDirCfg);
+
+ String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
+
+ rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
+
+ // Handshake.
+ IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
+
+ grpBlockSize = handshake.blockSize();
+
+ IgfsPaths paths = handshake.secondaryPaths();
+
+ Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false);
+
+ if (handshake.sampling() != null ? handshake.sampling() : logEnabled) {
+ // Initiate client logger.
+ if (logDir == null)
+ throw new IOException("Failed to resolve log directory: " + logDirCfg);
+
+ Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE);
+
+ clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize);
+ }
+ else
+ clientLog = IgfsLogger.disabledLogger();
+
+ try {
+ modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes());
+ }
+ catch (IgniteCheckedException ice) {
+ throw new IOException(ice);
+ }
+
+ boolean initSecondary = paths.defaultMode() == PROXY;
+
+ if (!initSecondary && paths.pathModes() != null) {
+ for (T2<IgfsPath, IgfsMode> pathMode : paths.pathModes()) {
+ IgfsMode mode = pathMode.getValue();
+
+ if (mode == PROXY) {
+ initSecondary = true;
+
+ break;
+ }
+ }
+ }
+
+ if (initSecondary) {
+ try {
+ HadoopFileSystemFactory factory0 =
+ (HadoopFileSystemFactory) paths.getPayload(getClass().getClassLoader());
+
+ factory = HadoopDelegateUtils.fileSystemFactoryDelegate(factory0);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to get secondary file system factory.", e);
+ }
+
+ if (factory == null)
+ throw new IOException("Failed to get secondary file system factory (did you set " +
+ IgniteHadoopIgfsSecondaryFileSystem.class.getName() + " as \"secondaryFIleSystem\" in " +
+ FileSystemConfiguration.class.getName() + "?)");
+
+ assert factory != null;
+
+ factory.start();
+
+ try {
+ FileSystem secFs = (FileSystem)factory.get(user);
+
+ secondaryUri = secFs.getUri();
+
+ A.ensure(secondaryUri != null, "Secondary file system uri should not be null.");
+ }
+ catch (IOException e) {
+ throw new IOException("Failed to connect to the secondary file system: " + secondaryUri, e);
+ }
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ if (closeGuard.compareAndSet(false, true)) {
+ if (rmtClient == null)
+ return;
+
+ rmtClient.close(false);
+
+ if (clientLog.isLogEnabled())
+ clientLog.close();
+
+ if (factory != null)
+ factory.stop();
+
+ // Reset initialized resources.
+ rmtClient = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public URI getUri() {
+ return uri;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getUriDefaultPort() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FsServerDefaults getServerDefaults() throws IOException {
+ return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024,
+ false, 0, DataChecksum.Type.NULL);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean setReplication(Path f, short replication) throws IOException {
+ return mode(f) == PROXY && secondaryFileSystem().setReplication(f, replication);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTimes(Path f, long mtime, long atime) throws IOException {
+ if (mode(f) == PROXY)
+ secondaryFileSystem().setTimes(f, mtime, atime);
+ else {
+ if (mtime == -1 && atime == -1)
+ return;
+
+ rmtClient.setTimes(convert(f), atime, mtime);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public FsStatus getFsStatus() throws IOException {
+ IgfsStatus status = rmtClient.fsStatus();
+
+ return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setPermission(Path p, FsPermission perm) throws IOException {
+ enterBusy();
+
+ try {
+ A.notNull(p, "p");
+
+ if (mode(p) == PROXY)
+ secondaryFileSystem().setPermission(toSecondary(p), perm);
+ else {
+ if (rmtClient.update(convert(p), permission(perm)) == null)
+ throw new IOException("Failed to set file permission (file not found?)" +
+ " [path=" + p + ", perm=" + perm + ']');
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setOwner(Path p, String usr, String grp) throws IOException {
+ A.notNull(p, "p");
+ A.notNull(usr, "username");
+ A.notNull(grp, "grpName");
+
+ enterBusy();
+
+ try {
+ if (mode(p) == PROXY)
+ secondaryFileSystem().setOwner(toSecondary(p), usr, grp);
+ else if (rmtClient.update(convert(p), F.asMap(IgfsUtils.PROP_USER_NAME, usr,
+ IgfsUtils.PROP_GROUP_NAME, grp)) == null) {
+ throw new IOException("Failed to set file permission (file not found?)" +
+ " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']');
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataInputStream open(Path f, int bufSize) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = modeRslvr.resolveMode(path);
+
+ if (mode == PROXY) {
+ FSDataInputStream is = secondaryFileSystem().open(toSecondary(f), bufSize);
+
+ if (clientLog.isLogEnabled()) {
+ // At this point we do not know file size, so we perform additional request to remote FS to get it.
+ FileStatus status = secondaryFileSystem().getFileStatus(toSecondary(f));
+
+ long size = status != null ? status.getLen() : -1;
+
+ long logId = IgfsLogger.nextId();
+
+ clientLog.logOpen(logId, path, PROXY, bufSize, size);
+
+ return new FSDataInputStream(new HadoopIgfsProxyInputStream(is, clientLog, logId));
+ }
+ else
+ return is;
+ }
+ else {
+ HadoopIgfsStreamDelegate stream = seqReadsBeforePrefetchOverride ?
+ rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path);
+
+ long logId = -1;
+
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
+
+ clientLog.logOpen(logId, path, mode, bufSize, stream.length());
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path +
+ ", bufSize=" + bufSize + ']');
+
+ HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(),
+ bufSize, LOG, clientLog, logId);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']');
+
+ return new FSDataInputStream(igfsIn);
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public FSDataOutputStream createInternal(
+ Path f,
+ EnumSet<CreateFlag> flag,
+ FsPermission perm,
+ int bufSize,
+ short replication,
+ long blockSize,
+ Progressable progress,
+ Options.ChecksumOpt checksumOpt,
+ boolean createParent
+ ) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+ boolean append = flag.contains(CreateFlag.APPEND);
+ boolean create = flag.contains(CreateFlag.CREATE);
+
+ OutputStream out = null;
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = modeRslvr.resolveMode(path);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" +
+ path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']');
+
+ if (mode == PROXY) {
+ FSDataOutputStream os = secondaryFileSystem().create(toSecondary(f), perm, flag, bufSize,
+ replication, blockSize, progress);
+
+ if (clientLog.isLogEnabled()) {
+ long logId = IgfsLogger.nextId();
+
+ if (append)
+ clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID.
+ else
+ clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize);
+
+ return new FSDataOutputStream(new HadoopIgfsProxyOutputStream(os, clientLog, logId));
+ }
+ else
+ return os;
+ }
+ else {
+ Map<String, String> permMap = F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm),
+ IgfsUtils.PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites));
+
+ // Create stream and close it in the 'finally' section if any sequential operation failed.
+ HadoopIgfsStreamDelegate stream;
+
+ long logId = -1;
+
+ if (append) {
+ stream = rmtClient.append(path, create, permMap);
+
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
+
+ clientLog.logAppend(logId, path, mode, bufSize);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']');
+ }
+ else {
+ stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize,
+ permMap);
+
+ if (clientLog.isLogEnabled()) {
+ logId = IgfsLogger.nextId();
+
+ clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']');
+ }
+
+ assert stream != null;
+
+ HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG,
+ clientLog, logId);
+
+ bufSize = Math.max(64 * 1024, bufSize);
+
+ out = new BufferedOutputStream(igfsOut, bufSize);
+
+ FSDataOutputStream res = new FSDataOutputStream(out, null, 0);
+
+ // Mark stream created successfully.
+ out = null;
+
+ return res;
+ }
+ }
+ finally {
+ // Close if failed during stream creation.
+ if (out != null)
+ U.closeQuiet(out);
+
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean supportsSymlinks() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void renameInternal(Path src, Path dst) throws IOException {
+ A.notNull(src, "src");
+ A.notNull(dst, "dst");
+
+ enterBusy();
+
+ try {
+ IgfsPath srcPath = convert(src);
+ IgfsPath dstPath = convert(dst);
+
+ IgfsMode srcMode = modeRslvr.resolveMode(srcPath);
+
+ if (clientLog.isLogEnabled())
+ clientLog.logRename(srcPath, srcMode, dstPath);
+
+ if (srcMode == PROXY)
+ secondaryFileSystem().rename(toSecondary(src), toSecondary(dst));
+ else
+ rmtClient.rename(srcPath, dstPath);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(Path f, boolean recursive) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+
+ IgfsMode mode = modeRslvr.resolveMode(path);
+
+ if (mode == PROXY) {
+ if (clientLog.isLogEnabled())
+ clientLog.logDelete(path, PROXY, recursive);
+
+ return secondaryFileSystem().delete(toSecondary(f), recursive);
+ }
+
+ boolean res = rmtClient.delete(path, recursive);
+
+ if (clientLog.isLogEnabled())
+ clientLog.logDelete(path, mode, recursive);
+
+ return res;
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException {
+ // Checksum has effect for secondary FS only.
+ if (factory != null)
+ secondaryFileSystem().setVerifyChecksum(verifyChecksum);
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileChecksum getFileChecksum(Path f) throws IOException {
+ if (mode(f) == PROXY)
+ return secondaryFileSystem().getFileChecksum(f);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus[] listStatus(Path f) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = modeRslvr.resolveMode(path);
+
+ if (mode == PROXY) {
+ FileStatus[] arr = secondaryFileSystem().listStatus(toSecondary(f));
+
+ if (arr == null)
+ throw new FileNotFoundException("File " + f + " does not exist.");
+
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = toPrimary(arr[i]);
+
+ if (clientLog.isLogEnabled()) {
+ String[] fileArr = new String[arr.length];
+
+ for (int i = 0; i < arr.length; i++)
+ fileArr[i] = arr[i].getPath().toString();
+
+ clientLog.logListDirectory(path, PROXY, fileArr);
+ }
+
+ return arr;
+ }
+ else {
+ Collection<IgfsFile> list = rmtClient.listFiles(path);
+
+ if (list == null)
+ throw new FileNotFoundException("File " + f + " does not exist.");
+
+ List<IgfsFile> files = new ArrayList<>(list);
+
+ FileStatus[] arr = new FileStatus[files.size()];
+
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = convert(files.get(i));
+
+ if (clientLog.isLogEnabled()) {
+ String[] fileArr = new String[arr.length];
+
+ for (int i = 0; i < arr.length; i++)
+ fileArr[i] = arr[i].getPath().toString();
+
+ clientLog.logListDirectory(path, mode, fileArr);
+ }
+
+ return arr;
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ IgfsPath path = convert(f);
+ IgfsMode mode = modeRslvr.resolveMode(path);
+
+ if (mode == PROXY) {
+ if (clientLog.isLogEnabled())
+ clientLog.logMakeDirectory(path, PROXY);
+
+ secondaryFileSystem().mkdirs(toSecondary(f), perm);
+ }
+ else {
+ rmtClient.mkdirs(path, permission(perm));
+
+ if (clientLog.isLogEnabled())
+ clientLog.logMakeDirectory(path, mode);
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileStatus(Path f) throws IOException {
+ A.notNull(f, "f");
+
+ enterBusy();
+
+ try {
+ if (mode(f) == PROXY)
+ return toPrimary(secondaryFileSystem().getFileStatus(toSecondary(f)));
+ else {
+ IgfsFile info = rmtClient.info(convert(f));
+
+ if (info == null)
+ throw new FileNotFoundException("File not found: " + f);
+
+ return convert(info);
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException {
+ A.notNull(path, "path");
+
+ IgfsPath igfsPath = convert(path);
+
+ enterBusy();
+
+ try {
+ if (modeRslvr.resolveMode(igfsPath) == PROXY)
+ return secondaryFileSystem().getFileBlockLocations(path, start, len);
+ else {
+ long now = System.currentTimeMillis();
+
+ List<IgfsBlockLocation> affinity = new ArrayList<>(
+ rmtClient.affinity(igfsPath, start, len));
+
+ BlockLocation[] arr = new BlockLocation[affinity.size()];
+
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = convert(affinity.get(i));
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" +
+ (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']');
+
+ return arr;
+ }
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * Resolve path mode.
+ *
+ * @param path HDFS path.
+ * @return Path mode.
+ */
+ public IgfsMode mode(Path path) {
+ return modeRslvr.resolveMode(convert(path));
+ }
+
+ /**
+ * Convert the given path to path acceptable by the primary file system.
+ *
+ * @param path Path.
+ * @return Primary file system path.
+ */
+ private Path toPrimary(Path path) {
+ return convertPath(path, getUri());
+ }
+
+ /**
+ * Convert the given path to path acceptable by the secondary file system.
+ *
+ * @param path Path.
+ * @return Secondary file system path.
+ */
+ private Path toSecondary(Path path) {
+ assert factory != null;
+ assert secondaryUri != null;
+
+ return convertPath(path, secondaryUri);
+ }
+
+ /**
+ * Convert path using the given new URI.
+ *
+ * @param path Old path.
+ * @param newUri New URI.
+ * @return New path.
+ */
+ private Path convertPath(Path path, URI newUri) {
+ assert newUri != null;
+
+ if (path != null) {
+ URI pathUri = path.toUri();
+
+ try {
+ return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null,
+ pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null));
+ }
+ catch (URISyntaxException e) {
+ throw new IgniteException("Failed to construct secondary file system path from the primary file " +
+ "system path: " + path, e);
+ }
+ }
+ else
+ return null;
+ }
+
+ /**
+ * Convert a file status obtained from the secondary file system to a status of the primary file system.
+ *
+ * @param status Secondary file system status.
+ * @return Primary file system status.
+ */
+ private FileStatus toPrimary(FileStatus status) {
+ return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(),
+ status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(),
+ status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null;
+ }
+
+ /**
+ * Convert IGFS path into Hadoop path.
+ *
+ * @param path IGFS path.
+ * @return Hadoop path.
+ */
+ private Path convert(IgfsPath path) {
+ return new Path(IGFS_SCHEME, uriAuthority, path.toString());
+ }
+
+ /**
+ * Convert Hadoop path into IGFS path.
+ *
+ * @param path Hadoop path.
+ * @return IGFS path.
+ */
+ @Nullable private IgfsPath convert(Path path) {
+ if (path == null)
+ return null;
+
+ return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
+ new IgfsPath(workingDir, path.toUri().getPath());
+ }
+
+ /**
+ * Convert IGFS affinity block location into Hadoop affinity block location.
+ *
+ * @param block IGFS affinity block location.
+ * @return Hadoop affinity block location.
+ */
+ private BlockLocation convert(IgfsBlockLocation block) {
+ Collection<String> names = block.names();
+ Collection<String> hosts = block.hosts();
+
+ return new BlockLocation(
+ names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */,
+ hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */,
+ block.start(), block.length()
+ ) {
+ @Override public String toString() {
+ try {
+ return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() +
+ ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']';
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Convert IGFS file information into Hadoop file status.
+ *
+ * @param file IGFS file information.
+ * @return Hadoop file status.
+ */
+ private FileStatus convert(IgfsFile file) {
+ return new FileStatus(
+ file.length(),
+ file.isDirectory(),
+ dfltReplication,
+ file.groupBlockSize(),
+ file.modificationTime(),
+ file.accessTime(),
+ permission(file),
+ file.property(IgfsUtils.PROP_USER_NAME, user),
+ file.property(IgfsUtils.PROP_GROUP_NAME, "users"),
+ convert(file.path())) {
+ @Override public String toString() {
+ return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]";
+ }
+ };
+ }
+
+ /**
+ * Convert Hadoop permission into IGFS file attribute.
+ *
+ * @param perm Hadoop permission.
+ * @return IGFS attributes.
+ */
+ private Map<String, String> permission(FsPermission perm) {
+ if (perm == null)
+ perm = FsPermission.getDefault();
+
+ return F.asMap(IgfsUtils.PROP_PERMISSION, toString(perm));
+ }
+
+ /**
+ * @param perm Permission.
+ * @return String.
+ */
+ private static String toString(FsPermission perm) {
+ return String.format("%04o", perm.toShort());
+ }
+
+ /**
+ * Convert IGFS file attributes into Hadoop permission.
+ *
+ * @param file File info.
+ * @return Hadoop permission.
+ */
+ private FsPermission permission(IgfsFile file) {
+ String perm = file.property(IgfsUtils.PROP_PERMISSION, null);
+
+ if (perm == null)
+ return FsPermission.getDefault();
+
+ try {
+ return new FsPermission((short)Integer.parseInt(perm, 8));
+ }
+ catch (NumberFormatException ignore) {
+ return FsPermission.getDefault();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteHadoopFileSystem.class, this);
+ }
+
+ /**
+ * Returns the user name this File System is created on behalf of.
+ * @return the user name
+ */
+ public String user() {
+ return user;
+ }
+
+ /**
+ * Gets cached or creates a {@link FileSystem}.
+ *
+ * @return The secondary file system.
+ */
+ private FileSystem secondaryFileSystem() throws IOException{
+ assert factory != null;
+
+ return (FileSystem)factory.get(user);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
new file mode 100644
index 0000000..d8e70d1
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Ignite Hadoop 2.x <code>FileSystem</code> implementation.
+ */
+package org.apache.ignite.hadoop.fs.v2;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
new file mode 100644
index 0000000..343b5ed
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientException;
+import org.apache.ignite.internal.client.GridClientFactory;
+import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
+import org.apache.ignite.internal.processors.hadoop.impl.proto.HadoopClientProtocol;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.ignite.internal.client.GridClientProtocol.TCP;
+
+
+/**
+ * Ignite Hadoop client protocol provider.
+ */
+public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider {
+ /** Framework name used in configuration. */
+ public static final String FRAMEWORK_NAME = "ignite";
+
+ /** Clients. */
+ private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public ClientProtocol create(Configuration conf) throws IOException {
+ if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+ String addr = conf.get(MRConfig.MASTER_ADDRESS);
+
+ if (F.isEmpty(addr))
+ throw new IOException("Failed to create client protocol because server address is not specified (is " +
+ MRConfig.MASTER_ADDRESS + " property set?).");
+
+ if (F.eq(addr, "local"))
+ throw new IOException("Local execution mode is not supported, please point " +
+ MRConfig.MASTER_ADDRESS + " to real Ignite node.");
+
+ return createProtocol(addr, conf);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
+ if (FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME)))
+ return createProtocol(addr.getHostString() + ":" + addr.getPort(), conf);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close(ClientProtocol cliProto) throws IOException {
+ // No-op.
+ }
+
+ /**
+ * Internal protocol creation routine.
+ *
+ * @param addr Address.
+ * @param conf Configuration.
+ * @return Client protocol.
+ * @throws IOException If failed.
+ */
+ private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException {
+ return new HadoopClientProtocol(conf, client(addr));
+ }
+
+ /**
+ * Create client.
+ *
+ * @param addr Endpoint address.
+ * @return Client.
+ * @throws IOException If failed.
+ */
+ private static GridClient client(String addr) throws IOException {
+ try {
+ IgniteInternalFuture<GridClient> fut = cliMap.get(addr);
+
+ if (fut == null) {
+ GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>();
+
+ IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(addr, fut0);
+
+ if (oldFut != null)
+ return oldFut.get();
+ else {
+ GridClientConfiguration cliCfg = new GridClientConfiguration();
+
+ cliCfg.setProtocol(TCP);
+ cliCfg.setServers(Collections.singletonList(addr));
+ cliCfg.setMarshaller(new GridClientJdkMarshaller());
+ cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day.
+ cliCfg.setDaemon(true);
+
+ try {
+ GridClient cli = GridClientFactory.start(cliCfg);
+
+ fut0.onDone(cli);
+
+ return cli;
+ }
+ catch (GridClientException e) {
+ fut0.onDone(e);
+
+ throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
+ }
+ }
+ }
+ else
+ return fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to establish connection with Ignite node: " + addr, e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
new file mode 100644
index 0000000..7635b9e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Ignite Hadoop Accelerator map-reduce classes.
+ */
+package org.apache.ignite.hadoop.mapreduce;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
index ed39ce5..2e75e5f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
@@ -194,7 +194,6 @@ public class HadoopSetup {
addJarsInFolder(jarFiles, igniteLibs);
addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop"));
- addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl"));
boolean jarsLinksCorrect = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
new file mode 100644
index 0000000..0ab64d9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounterGroup.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Hadoop +counter group adapter.
+ */
+class HadoopMapReduceCounterGroup implements CounterGroup {
+ /** Counters. */
+ private final HadoopMapReduceCounters cntrs;
+
+ /** Group name. */
+ private final String name;
+
+ /**
+ * Creates new instance.
+ *
+ * @param cntrs Client counters instance.
+ * @param name Group name.
+ */
+ HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
+ this.cntrs = cntrs;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDisplayName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDisplayName(String displayName) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addCounter(Counter counter) {
+ addCounter(counter.getName(), counter.getDisplayName(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter addCounter(String name, String displayName, long value) {
+ final Counter counter = cntrs.findCounter(this.name, name);
+
+ counter.setValue(value);
+
+ return counter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, String displayName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, boolean create) {
+ return cntrs.findCounter(name, counterName, create);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return cntrs.groupSize(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
+ for (final Counter counter : rightGroup)
+ cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Counter> iterator() {
+ return cntrs.iterateGroup(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
new file mode 100644
index 0000000..df5c1ee
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceCounters.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.counters.AbstractCounters;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Counter;
+import org.apache.ignite.internal.util.typedef.T2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Hadoop counters adapter.
+ */
+public class HadoopMapReduceCounters extends Counters {
+ /** */
+ private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
+
+ /**
+ * Creates new instance based on given counters.
+ *
+ * @param cntrs Counters to adapt.
+ */
+ public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) {
+ for (HadoopCounter cntr : cntrs.all())
+ if (cntr instanceof HadoopLongCounter)
+ this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
+ return addGroup(grp.getName(), grp.getDisplayName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroup addGroup(String name, String displayName) {
+ return new HadoopMapReduceCounterGroup(this, name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String grpName, String cntrName) {
+ return findCounter(grpName, cntrName, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Counter findCounter(Enum<?> key) {
+ return findCounter(key.getDeclaringClass().getName(), key.name(), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
+ return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Iterable<String> getGroupNames() {
+ Collection<String> res = new HashSet<>();
+
+ for (HadoopCounter counter : cntrs.values())
+ res.add(counter.group());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<CounterGroup> iterator() {
+ final Iterator<String> iter = getGroupNames().iterator();
+
+ return new Iterator<CounterGroup>() {
+ @Override public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override public CounterGroup next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next());
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException("not implemented");
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized CounterGroup getGroup(String grpName) {
+ return new HadoopMapReduceCounterGroup(this, grpName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int countCounters() {
+ return cntrs.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
+ for (CounterGroup group : other) {
+ for (Counter counter : group) {
+ findCounter(group.getName(), counter.getName()).increment(counter.getValue());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object genericRight) {
+ if (!(genericRight instanceof HadoopMapReduceCounters))
+ return false;
+
+ return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return cntrs.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWriteAllCounters(boolean snd) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean getWriteAllCounters() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Limits limits() {
+ return null;
+ }
+
+ /**
+ * Returns size of a group.
+ *
+ * @param grpName Name of the group.
+ * @return amount of counters in the given group.
+ */
+ public int groupSize(String grpName) {
+ int res = 0;
+
+ for (HadoopCounter counter : cntrs.values()) {
+ if (grpName.equals(counter.group()))
+ res++;
+ }
+
+ return res;
+ }
+
+ /**
+ * Returns counters iterator for specified group.
+ *
+ * @param grpName Name of the group to iterate.
+ * @return Counters iterator.
+ */
+ public Iterator<Counter> iterateGroup(String grpName) {
+ Collection<Counter> grpCounters = new ArrayList<>();
+
+ for (HadoopLongCounter counter : cntrs.values()) {
+ if (grpName.equals(counter.group()))
+ grpCounters.add(new HadoopV2Counter(counter));
+ }
+
+ return grpCounters.iterator();
+ }
+
+ /**
+ * Find a counter in the group.
+ *
+ * @param grpName The name of the counter group.
+ * @param cntrName The name of the counter.
+ * @param create Create the counter if not found if true.
+ * @return The counter that was found or added or {@code null} if create is false.
+ */
+ public Counter findCounter(String grpName, String cntrName, boolean create) {
+ T2<String, String> key = new T2<>(grpName, cntrName);
+
+ HadoopLongCounter internalCntr = cntrs.get(key);
+
+ if (internalCntr == null & create) {
+ internalCntr = new HadoopLongCounter(grpName,cntrName);
+
+ cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
+ }
+
+ return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
new file mode 100644
index 0000000..347bfae
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Hadoop utility methods.
+ */
+public class HadoopUtils {
+ /** Staging constant. */
+ private static final String STAGING_CONSTANT = ".staging";
+
+ /** Old mapper class attribute. */
+ private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
+
+ /** Old reducer class attribute. */
+ private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
+
+ /**
+ * Constructor.
+ */
+ private HadoopUtils() {
+ // No-op.
+ }
+
+ /**
+ * Wraps native split.
+ *
+ * @param id Split ID.
+ * @param split Split.
+ * @param hosts Hosts.
+ * @throws IOException If failed.
+ */
+ public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
+ ByteArrayOutputStream arr = new ByteArrayOutputStream();
+ ObjectOutput out = new ObjectOutputStream(arr);
+
+ assert split instanceof Writable;
+
+ ((Writable)split).write(out);
+
+ out.flush();
+
+ return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
+ }
+
+ /**
+ * Unwraps native split.
+ *
+ * @param o Wrapper.
+ * @return Split.
+ */
+ public static Object unwrapSplit(HadoopSplitWrapper o) {
+ try {
+ Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
+
+ w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes())));
+
+ return w;
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Convert Ignite job status to Hadoop job status.
+ *
+ * @param status Ignite job status.
+ * @return Hadoop job status.
+ */
+ public static JobStatus status(HadoopJobStatus status, Configuration conf) {
+ JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
+
+ float setupProgress = 0;
+ float mapProgress = 0;
+ float reduceProgress = 0;
+ float cleanupProgress = 0;
+
+ JobStatus.State state = JobStatus.State.RUNNING;
+
+ switch (status.jobPhase()) {
+ case PHASE_SETUP:
+ setupProgress = 0.42f;
+
+ break;
+
+ case PHASE_MAP:
+ setupProgress = 1;
+ mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt();
+
+ break;
+
+ case PHASE_REDUCE:
+ setupProgress = 1;
+ mapProgress = 1;
+
+ if (status.totalReducerCnt() > 0)
+ reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+ else
+ reduceProgress = 1f;
+
+ break;
+
+ case PHASE_CANCELLING:
+ case PHASE_COMPLETE:
+ if (!status.isFailed()) {
+ setupProgress = 1;
+ mapProgress = 1;
+ reduceProgress = 1;
+ cleanupProgress = 1;
+
+ state = JobStatus.State.SUCCEEDED;
+ }
+ else
+ state = JobStatus.State.FAILED;
+
+ break;
+
+ default:
+ assert false;
+ }
+
+ return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state,
+ JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A");
+ }
+
+ /**
+ * Gets staging area directory.
+ *
+ * @param conf Configuration.
+ * @param usr User.
+ * @return Staging area directory.
+ */
+ public static Path stagingAreaDir(Configuration conf, String usr) {
+ return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
+ }
+
+ /**
+ * Gets job file.
+ *
+ * @param conf Configuration.
+ * @param usr User.
+ * @param jobId Job ID.
+ * @return Job file.
+ */
+ public static Path jobFile(Configuration conf, String usr, JobID jobId) {
+ return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
+ }
+
+ /**
+ * Checks the attribute in configuration is not set.
+ *
+ * @param attr Attribute name.
+ * @param msg Message for creation of exception.
+ * @throws IgniteCheckedException If attribute is set.
+ */
+ public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException {
+ if (cfg.get(attr) != null)
+ throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode.");
+ }
+
+ /**
+ * Creates JobInfo from hadoop configuration.
+ *
+ * @param cfg Hadoop configuration.
+ * @return Job info.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
+ JobConf jobConf = new JobConf(cfg);
+
+ boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
+ || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
+
+ int numReduces = jobConf.getNumReduceTasks();
+
+ jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null);
+
+ if (jobConf.getUseNewMapper()) {
+ String mode = "new map API";
+
+ ensureNotSet(jobConf, "mapred.input.format.class", mode);
+ ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
+
+ if (numReduces != 0)
+ ensureNotSet(jobConf, "mapred.partitioner.class", mode);
+ else
+ ensureNotSet(jobConf, "mapred.output.format.class", mode);
+ }
+ else {
+ String mode = "map compatibility";
+
+ ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
+ ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
+
+ if (numReduces != 0)
+ ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
+ else
+ ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
+ }
+
+ if (numReduces != 0) {
+ jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
+
+ if (jobConf.getUseNewReducer()) {
+ String mode = "new reduce API";
+
+ ensureNotSet(jobConf, "mapred.output.format.class", mode);
+ ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
+ }
+ else {
+ String mode = "reduce compatibility";
+
+ ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
+ ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
+ }
+ }
+
+ Map<String, String> props = new HashMap<>();
+
+ for (Map.Entry<String, String> entry : jobConf)
+ props.put(entry.getKey(), entry.getValue());
+
+ return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
+ }
+
+ /**
+ * Throws new {@link IgniteCheckedException} with original exception is serialized into string.
+ * This is needed to transfer error outside the current class loader.
+ *
+ * @param e Original exception.
+ * @return IgniteCheckedException New exception.
+ */
+ public static IgniteCheckedException transformException(Throwable e) {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+
+ e.printStackTrace(new PrintStream(os, true));
+
+ return new IgniteCheckedException(os.toString());
+ }
+
+ /**
+ * Returns work directory for job execution.
+ *
+ * @param locNodeId Local node ID.
+ * @param jobId Job ID.
+ * @return Working directory for job.
+ * @throws IgniteCheckedException If Failed.
+ */
+ public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException {
+ return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId);
+ }
+
+ /**
+ * Returns subdirectory of job working directory for task execution.
+ *
+ * @param locNodeId Local node ID.
+ * @param info Task info.
+ * @return Working directory for task.
+ * @throws IgniteCheckedException If Failed.
+ */
+ public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException {
+ File jobLocDir = jobLocalDir(locNodeId, info.jobId());
+
+ return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
+ }
+
+ /**
+ * Creates {@link Configuration} in a correct class loader context to avoid caching
+ * of inappropriate class loader in the Configuration object.
+ * @return New instance of {@link Configuration}.
+ */
+ public static Configuration safeCreateConfiguration() {
+ final ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(Configuration.class.getClassLoader());
+
+ try {
+ return new Configuration();
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..a190b14
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopBasicFileSystemFactoryDelegate.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.delegate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.hadoop.util.UserNameMapper;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Arrays;
+
+/**
+ * Basic Hadoop file system factory delegate.
+ */
+public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
+ /** Proxy. */
+ protected final HadoopFileSystemFactory proxy;
+
+ /** Configuration of the secondary filesystem, never null. */
+ protected Configuration cfg;
+
+ /** Resulting URI. */
+ protected URI fullUri;
+
+ /** User name mapper. */
+ private UserNameMapper usrNameMapper;
+
+ /**
+ * Constructor.
+ *
+ * @param proxy Proxy.
+ */
+ public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) {
+ this.proxy = proxy;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem get(String name) throws IOException {
+ String name0 = IgfsUtils.fixUserName(name);
+
+ if (usrNameMapper != null)
+ name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0));
+
+ return getWithMappedName(name0);
+ }
+
+ /**
+ * Internal file system create routine.
+ *
+ * @param usrName User name.
+ * @return File system.
+ * @throws IOException If failed.
+ */
+ protected FileSystem getWithMappedName(String usrName) throws IOException {
+ assert cfg != null;
+
+ try {
+ // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
+ // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
+ // classloader to classloader of current class to avoid strange class-cast-exceptions.
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ return create(usrName);
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException("Failed to create file system due to interrupt.", e);
+ }
+ }
+
+ /**
+ * Internal file system creation routine, invoked in correct class loader context.
+ *
+ * @param usrName User name.
+ * @return File system.
+ * @throws IOException If failed.
+ * @throws InterruptedException if the current thread is interrupted.
+ */
+ protected FileSystem create(String usrName) throws IOException, InterruptedException {
+ return FileSystem.get(fullUri, cfg, usrName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy;
+
+ cfg = HadoopUtils.safeCreateConfiguration();
+
+ if (proxy0.getConfigPaths() != null) {
+ for (String cfgPath : proxy0.getConfigPaths()) {
+ if (cfgPath == null)
+ throw new NullPointerException("Configuration path cannot be null: " +
+ Arrays.toString(proxy0.getConfigPaths()));
+ else {
+ URL url = U.resolveIgniteUrl(cfgPath);
+
+ if (url == null) {
+ // If secConfPath is given, it should be resolvable:
+ throw new IgniteException("Failed to resolve secondary file system configuration path " +
+ "(ensure that it exists locally and you have read access to it): " + cfgPath);
+ }
+
+ cfg.addResource(url);
+ }
+ }
+ }
+
+ // If secondary fs URI is not given explicitly, try to get it from the configuration:
+ if (proxy0.getUri() == null)
+ fullUri = FileSystem.getDefaultUri(cfg);
+ else {
+ try {
+ fullUri = new URI(proxy0.getUri());
+ }
+ catch (URISyntaxException use) {
+ throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri());
+ }
+ }
+
+ usrNameMapper = proxy0.getUserNameMapper();
+
+ if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
+ ((LifecycleAware)usrNameMapper).start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
+ ((LifecycleAware)usrNameMapper).stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..0cec8ca
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopCachingFileSystemFactoryDelegate.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.delegate;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
+
+import java.io.IOException;
+
+/**
+ * Caching Hadoop file system factory delegate.
+ */
+public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate {
+ /** Per-user file system cache. */
+ private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
+ new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
+ @Override public FileSystem createValue(String key) throws IOException {
+ return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key);
+ }
+ }
+ );
+
+ /**
+ * Constructor.
+ *
+ * @param proxy Proxy.
+ */
+ public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) {
+ super(proxy);
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem getWithMappedName(String name) throws IOException {
+ return cache.getOrCreate(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ super.start();
+
+ // Disable caching.
+ cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ super.stop();
+
+ try {
+ cache.close();
+ }
+ catch (IgniteCheckedException ice) {
+ throw new IgniteException(ice);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..20ac88e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopDefaultFileSystemFactoryDelegate.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.delegate;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
+import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFactoryDelegate;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import java.io.IOException;
+
+/**
+ * Hadoop file system factory delegate for non-standard factories.
+ */
+public class HadoopDefaultFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
+ /** Factory. */
+ private final HadoopFileSystemFactory factory;
+
+ /**
+ * Constructor.
+ *
+ * @param factory Factory.
+ */
+ public HadoopDefaultFileSystemFactoryDelegate(HadoopFileSystemFactory factory) {
+ assert factory != null;
+
+ this.factory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystem get(String usrName) throws IOException {
+ return (FileSystem)factory.get(usrName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware)factory).start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ if (factory instanceof LifecycleAware)
+ ((LifecycleAware)factory).stop();
+ }
+}