You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/09/26 12:13:58 UTC

[44/50] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
new file mode 100644
index 0000000..511aa5a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * The wrapper for native hadoop input splits.
+ *
+ * Warning!! This class must not depend on any Hadoop classes directly or indirectly.
+ */
+public class HadoopSplitWrapper extends HadoopInputSplit {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Native hadoop input split. */
+    private byte[] bytes;
+
+    /** */
+    private String clsName;
+
+    /** Internal ID */
+    private int id;
+
+    /**
+     * Creates new split wrapper.
+     */
+    public HadoopSplitWrapper() {
+        // No-op.
+    }
+
+    /**
+     * Creates new split wrapper.
+     *
+     * @param id Split ID.
+     * @param clsName Class name.
+     * @param bytes Serialized class.
+     * @param hosts Hosts where split is located.
+     */
+    public HadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) {
+        assert hosts != null;
+        assert clsName != null;
+        assert bytes != null;
+
+        this.hosts = hosts;
+        this.id = id;
+
+        this.clsName = clsName;
+        this.bytes = bytes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(id);
+
+        out.writeUTF(clsName);
+        U.writeByteArray(out, bytes);
+    }
+
+    /**
+     * @return Class name.
+     */
+    public String className() {
+        return clsName;
+    }
+
+    /**
+     * @return Class bytes.
+     */
+    public byte[] bytes() {
+        return bytes;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        id = in.readInt();
+
+        clsName = in.readUTF();
+        bytes = U.readByteArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        HadoopSplitWrapper that = (HadoopSplitWrapper)o;
+
+        return id == that.id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return id;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
deleted file mode 100644
index 65d9810..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobPriority;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop utility methods.
- */
-public class HadoopUtils {
-    /** Property to store timestamp of new job id request. */
-    public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs";
-
-    /** Property to store timestamp of response of new job id request. */
-    public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs";
-
-    /** Property to store timestamp of job submission. */
-    public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs";
-
-    /** Property to set custom writer of job statistics. */
-    public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer";
-
-    /** Staging constant. */
-    private static final String STAGING_CONSTANT = ".staging";
-
-    /** Old mapper class attribute. */
-    private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
-
-    /** Old reducer class attribute. */
-    private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
-
-    /**
-     * Constructor.
-     */
-    private HadoopUtils() {
-        // No-op.
-    }
-
-    /**
-     * Wraps native split.
-     *
-     * @param id Split ID.
-     * @param split Split.
-     * @param hosts Hosts.
-     * @throws IOException If failed.
-     */
-    public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
-        ByteArrayOutputStream arr = new ByteArrayOutputStream();
-        ObjectOutput out = new ObjectOutputStream(arr);
-
-        assert split instanceof Writable;
-
-        ((Writable)split).write(out);
-
-        out.flush();
-
-        return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
-    }
-
-    /**
-     * Unwraps native split.
-     *
-     * @param o Wrapper.
-     * @return Split.
-     */
-    public static Object unwrapSplit(HadoopSplitWrapper o) {
-        try {
-            Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
-
-            w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes())));
-
-            return w;
-        }
-        catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    /**
-     * Convert Ignite job status to Hadoop job status.
-     *
-     * @param status Ignite job status.
-     * @return Hadoop job status.
-     */
-    public static JobStatus status(HadoopJobStatus status, Configuration conf) {
-        JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
-
-        float setupProgress = 0;
-        float mapProgress = 0;
-        float reduceProgress = 0;
-        float cleanupProgress = 0;
-
-        JobStatus.State state = JobStatus.State.RUNNING;
-
-        switch (status.jobPhase()) {
-            case PHASE_SETUP:
-                setupProgress = 0.42f;
-
-                break;
-
-            case PHASE_MAP:
-                setupProgress = 1;
-                mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt();
-
-                break;
-
-            case PHASE_REDUCE:
-                setupProgress = 1;
-                mapProgress = 1;
-
-                if (status.totalReducerCnt() > 0)
-                    reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
-                else
-                    reduceProgress = 1f;
-
-                break;
-
-            case PHASE_CANCELLING:
-            case PHASE_COMPLETE:
-                if (!status.isFailed()) {
-                    setupProgress = 1;
-                    mapProgress = 1;
-                    reduceProgress = 1;
-                    cleanupProgress = 1;
-
-                    state = JobStatus.State.SUCCEEDED;
-                }
-                else
-                    state = JobStatus.State.FAILED;
-
-                break;
-
-            default:
-                assert false;
-        }
-
-        return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state,
-            JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A");
-    }
-
-    /**
-     * Gets staging area directory.
-     *
-     * @param conf Configuration.
-     * @param usr User.
-     * @return Staging area directory.
-     */
-    public static Path stagingAreaDir(Configuration conf, String usr) {
-        return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
-            + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
-    }
-
-    /**
-     * Gets job file.
-     *
-     * @param conf Configuration.
-     * @param usr User.
-     * @param jobId Job ID.
-     * @return Job file.
-     */
-    public static Path jobFile(Configuration conf, String usr, JobID jobId) {
-        return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
-    }
-
-    /**
-     * Checks the attribute in configuration is not set.
-     *
-     * @param attr Attribute name.
-     * @param msg Message for creation of exception.
-     * @throws IgniteCheckedException If attribute is set.
-     */
-    public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException {
-        if (cfg.get(attr) != null)
-            throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode.");
-    }
-
-    /**
-     * Creates JobInfo from hadoop configuration.
-     *
-     * @param cfg Hadoop configuration.
-     * @return Job info.
-     * @throws IgniteCheckedException If failed.
-     */
-    public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
-        JobConf jobConf = new JobConf(cfg);
-
-        boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
-                || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
-
-        int numReduces = jobConf.getNumReduceTasks();
-
-        jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null);
-
-        if (jobConf.getUseNewMapper()) {
-            String mode = "new map API";
-
-            ensureNotSet(jobConf, "mapred.input.format.class", mode);
-            ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
-
-            if (numReduces != 0)
-                ensureNotSet(jobConf, "mapred.partitioner.class", mode);
-            else
-                ensureNotSet(jobConf, "mapred.output.format.class", mode);
-        }
-        else {
-            String mode = "map compatibility";
-
-            ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
-            ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
-
-            if (numReduces != 0)
-                ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
-            else
-                ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
-        }
-
-        if (numReduces != 0) {
-            jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
-
-            if (jobConf.getUseNewReducer()) {
-                String mode = "new reduce API";
-
-                ensureNotSet(jobConf, "mapred.output.format.class", mode);
-                ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
-            }
-            else {
-                String mode = "reduce compatibility";
-
-                ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
-                ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
-            }
-        }
-
-        Map<String, String> props = new HashMap<>();
-
-        for (Map.Entry<String, String> entry : jobConf)
-            props.put(entry.getKey(), entry.getValue());
-
-        return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
-    }
-
-    /**
-     * Throws new {@link IgniteCheckedException} with original exception is serialized into string.
-     * This is needed to transfer error outside the current class loader.
-     *
-     * @param e Original exception.
-     * @return IgniteCheckedException New exception.
-     */
-    public static IgniteCheckedException transformException(Throwable e) {
-        ByteArrayOutputStream os = new ByteArrayOutputStream();
-
-        e.printStackTrace(new PrintStream(os, true));
-
-        return new IgniteCheckedException(os.toString());
-    }
-
-    /**
-     * Returns work directory for job execution.
-     *
-     * @param locNodeId Local node ID.
-     * @param jobId Job ID.
-     * @return Working directory for job.
-     * @throws IgniteCheckedException If Failed.
-     */
-    public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException {
-        return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId);
-    }
-
-    /**
-     * Returns subdirectory of job working directory for task execution.
-     *
-     * @param locNodeId Local node ID.
-     * @param info Task info.
-     * @return Working directory for task.
-     * @throws IgniteCheckedException If Failed.
-     */
-    public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException {
-        File jobLocDir = jobLocalDir(locNodeId, info.jobId());
-
-        return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
-    }
-
-    /**
-     * Creates {@link Configuration} in a correct class loader context to avoid caching
-     * of inappropriate class loader in the Configuration object.
-     * @return New instance of {@link Configuration}.
-     */
-    public static Configuration safeCreateConfiguration() {
-        final ClassLoader oldLdr = setContextClassLoader(Configuration.class.getClassLoader());
-
-        try {
-            return new Configuration();
-        }
-        finally {
-            restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /**
-     * Sort input splits by length.
-     *
-     * @param splits Splits.
-     * @return Sorted splits.
-     */
-    public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) {
-        int id = 0;
-
-        TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
-
-        for (HadoopInputSplit split : splits) {
-            long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0;
-
-            sortedSplits.add(new SplitSortWrapper(id++, split, len));
-        }
-
-        ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
-
-        for (SplitSortWrapper sortedSplit : sortedSplits)
-            res.add(sortedSplit.split);
-
-        return res;
-    }
-
-    /**
-     * Set context class loader.
-     *
-     * @param newLdr New class loader.
-     * @return Old class loader.
-     */
-    @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) {
-        ClassLoader oldLdr = Thread.currentThread().getContextClassLoader();
-
-        if (newLdr != oldLdr)
-            Thread.currentThread().setContextClassLoader(newLdr);
-
-        return oldLdr;
-    }
-
-    /**
-     * Restore context class loader.
-     *
-     * @param oldLdr Original class loader.
-     */
-    public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) {
-        ClassLoader newLdr = Thread.currentThread().getContextClassLoader();
-
-        if (newLdr != oldLdr)
-            Thread.currentThread().setContextClassLoader(oldLdr);
-    }
-
-    /**
-     * Split wrapper for sorting.
-     */
-    private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
-        /** Unique ID. */
-        private final int id;
-
-        /** Split. */
-        private final HadoopInputSplit split;
-
-        /** Split length. */
-        private final long len;
-
-        /**
-         * Constructor.
-         *
-         * @param id Unique ID.
-         * @param split Split.
-         * @param len Split length.
-         */
-        public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
-            this.id = id;
-            this.split = split;
-            this.len = len;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("NullableProblems")
-        @Override public int compareTo(SplitSortWrapper other) {
-            assert other != null;
-
-            long res = len - other.len;
-
-            if (res > 0)
-                return -1;
-            else if (res < 0)
-                return 1;
-            else
-                return id - other.id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return id;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
index 3f682d3..ee61a82 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
index dedc6b3..9baedc2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
@@ -24,6 +24,8 @@ import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.UUID;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
@@ -32,10 +34,6 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY;
-
 /**
  * Counter for the job statistics accumulation.
  */
@@ -221,9 +219,9 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
     public void clientSubmissionEvents(HadoopJobInfo info) {
         assert nodeId != null;
 
-        addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
-        addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
-        addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
+        addEventFromProperty("JOB requestId", info, HadoopCommonUtils.REQ_NEW_JOBID_TS_PROPERTY);
+        addEventFromProperty("JOB responseId", info, HadoopCommonUtils.RESPONSE_NEW_JOBID_TS_PROPERTY);
+        addEventFromProperty("JOB submit", info, HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java
new file mode 100644
index 0000000..76d9bff
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopDelegateUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.hadoop.fs.KerberosHadoopFileSystemFactory;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.jetbrains.annotations.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility methods for Hadoop delegates.
+ */
+public class HadoopDelegateUtils {
+    /** Secondary file system delegate class. */
+    private static final String SECONDARY_FILE_SYSTEM_CLS =
+        "org.apache.ignite.internal.processors.hadoop.impl.delegate.HadoopIgfsSecondaryFileSystemDelegateImpl";
+
+    /** Default file system factory class. */
+    private static final String DFLT_FACTORY_CLS =
+        "org.apache.ignite.internal.processors.hadoop.impl.delegate.HadoopDefaultFileSystemFactoryDelegate";
+
+    /** Factory proxy to delegate class name mapping. */
+    private static final Map<String, String> FACTORY_CLS_MAP;
+
+    /** Counter writer delegate implementation. */
+    private static final String COUNTER_WRITER_DELEGATE_CLS =
+        "org.apache.ignite.internal.processors.hadoop.impl.delegate.HadoopFileSystemCounterWriterDelegateImpl";
+
+    static {
+        FACTORY_CLS_MAP = new HashMap<>();
+
+        FACTORY_CLS_MAP.put(BasicHadoopFileSystemFactory.class.getName(),
+            "org.apache.ignite.internal.processors.hadoop.impl.delegate.HadoopBasicFileSystemFactoryDelegate");
+
+        FACTORY_CLS_MAP.put(CachingHadoopFileSystemFactory.class.getName(),
+            "org.apache.ignite.internal.processors.hadoop.impl.delegate.HadoopCachingFileSystemFactoryDelegate");
+
+        FACTORY_CLS_MAP.put(KerberosHadoopFileSystemFactory.class.getName(),
+            "org.apache.ignite.internal.processors.hadoop.impl.delegate.HadoopKerberosFileSystemFactoryDelegate");
+    }
+
+    /**
+     * Create delegate for secondary file system.
+     *
+     * @param ldr Hadoop class loader.
+     * @param proxy Proxy.
+     * @return Delegate.
+     */
+    public static HadoopIgfsSecondaryFileSystemDelegate secondaryFileSystemDelegate(HadoopClassLoader ldr,
+        IgniteHadoopIgfsSecondaryFileSystem proxy) {
+        return newInstance(SECONDARY_FILE_SYSTEM_CLS, ldr, proxy);
+    }
+
+    /**
+     * Create delegate for certain file system factory.
+     *
+     * @param proxy Proxy.
+     * @return Delegate.
+     */
+    @SuppressWarnings("unchecked")
+    public static HadoopFileSystemFactoryDelegate fileSystemFactoryDelegate(Object proxy) {
+        String clsName = FACTORY_CLS_MAP.get(proxy.getClass().getName());
+
+        if (clsName == null)
+            clsName = DFLT_FACTORY_CLS;
+
+        return newInstance(clsName, null, proxy);
+    }
+
+    /**
+     * Create delegate for Hadoop counter writer.
+     *
+     * @param ldr Class loader.
+     * @param proxy Proxy.
+     * @return Delegate.
+     */
+    public static HadoopFileSystemCounterWriterDelegate counterWriterDelegate(ClassLoader ldr,
+        IgniteHadoopFileSystemCounterWriter proxy) {
+        return newInstance(COUNTER_WRITER_DELEGATE_CLS, ldr, proxy);
+    }
+
+    /**
+     * Get new delegate instance.
+     *
+     * @param clsName Class name.
+     * @param ldr Optional class loader.
+     * @param proxy Proxy.
+     * @return Instance.
+     */
+    @SuppressWarnings("unchecked")
+    private static <T> T newInstance(String clsName, @Nullable ClassLoader ldr, Object proxy) {
+        try {
+            Class delegateCls = ldr == null ? Class.forName(clsName) : Class.forName(clsName, true, ldr);
+
+            Constructor[] ctors = delegateCls.getConstructors();
+
+            assert ctors.length == 1;
+
+            Object res = ctors[0].newInstance(proxy);
+
+            return (T)res;
+        }
+        catch (ReflectiveOperationException e) {
+            throw new IgniteException("Failed to instantiate delegate for proxy [proxy=" + proxy +
+                ", delegateClsName=" + clsName + ']', e);
+        }
+    }
+
+    /**
+     * Private constructor.
+     */
+    private HadoopDelegateUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
new file mode 100644
index 0000000..541cf80
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+
+/**
+ * Counter writer delegate interface.
+ */
+public interface HadoopFileSystemCounterWriterDelegate {
+    /**
+     * Writes counters of given job to some statistics storage.
+     *
+     * @param job The job.
+     * @param cntrs Counters.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemFactoryDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemFactoryDelegate.java
new file mode 100644
index 0000000..f051d62
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemFactoryDelegate.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+import java.io.IOException;
+
+/**
+ * Hadoop file system factory delegate.
+ */
+public interface HadoopFileSystemFactoryDelegate extends LifecycleAware {
+    /**
+     * Gets file system for the given user name.
+     *
+     * @param usrName User name
+     * @return File system.
+     * @throws IOException In case of error.
+     */
+    public Object get(String usrName) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegate.java
new file mode 100644
index 0000000..e381272
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopIgfsSecondaryFileSystemDelegate.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.delegate;
+
+import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemV2;
+import org.apache.ignite.lifecycle.LifecycleAware;
+
+/**
+ * Interface to secondary file system implementation.
+ */
+public interface HadoopIgfsSecondaryFileSystemDelegate extends IgfsSecondaryFileSystemV2, LifecycleAware {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
deleted file mode 100644
index 1ecbee5..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtils.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.IOException;
-import java.net.URI;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
-import org.apache.ignite.internal.util.GridStringBuilder;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * File system cache utility methods used by Map-Reduce tasks and jobs.
- */
-public class HadoopFileSystemCacheUtils {
-    /**
-     * A common static factory method. Creates new HadoopLazyConcurrentMap.
-     * @return a new HadoopLazyConcurrentMap.
-     */
-    public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() {
-        return new HadoopLazyConcurrentMap<>(
-            new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() {
-                @Override public FileSystem createValue(FsCacheKey key) throws IOException {
-                    try {
-                        assert key != null;
-
-                        // Explicitly disable FileSystem caching:
-                        URI uri = key.uri();
-
-                        String scheme = uri.getScheme();
-
-                        // Copy the configuration to avoid altering the external object.
-                        Configuration cfg = new Configuration(key.configuration());
-
-                        String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme);
-
-                        cfg.setBoolean(prop, true);
-
-                        return FileSystem.get(uri, cfg, key.user());
-                    }
-                    catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-
-                        throw new IOException("Failed to create file system due to interrupt.", e);
-                    }
-                }
-            }
-        );
-    }
-
-    /**
-     * Gets non-null user name as per the Hadoop viewpoint.
-     * @param cfg the Hadoop job configuration, may be null.
-     * @return the user name, never null.
-     */
-    private static String getMrHadoopUser(Configuration cfg) throws IOException {
-        String user = cfg.get(MRJobConfig.USER_NAME);
-
-        if (user == null)
-            user = IgniteHadoopFileSystem.getFsHadoopUser();
-
-        return user;
-    }
-
-    /**
-     * Common method to get the V1 file system in MapRed engine.
-     * It gets the filesystem for the user specified in the
-     * configuration with {@link MRJobConfig#USER_NAME} property.
-     * The file systems are created and cached in the given map upon first request.
-     *
-     * @param uri The file system uri.
-     * @param cfg The configuration.
-     * @param map The caching map.
-     * @return The file system.
-     * @throws IOException On error.
-     */
-    public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg,
-        HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map)
-            throws IOException {
-        assert map != null;
-        assert cfg != null;
-
-        final String usr = getMrHadoopUser(cfg);
-
-        assert usr != null;
-
-        if (uri == null)
-            uri = FileSystem.getDefaultUri(cfg);
-
-        final FileSystem fs;
-
-        try {
-            final FsCacheKey key = new FsCacheKey(uri, usr, cfg);
-
-            fs = map.getOrCreate(key);
-        }
-        catch (IgniteException ie) {
-            throw new IOException(ie);
-        }
-
-        assert fs != null;
-        assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user());
-
-        return fs;
-    }
-
-    /**
-     * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3).
-     * @param uri0 The uri.
-     * @param cfg The cfg.
-     * @return Correct URI.
-     */
-    private static URI fixUri(URI uri0, Configuration cfg) {
-        if (uri0 == null)
-            return FileSystem.getDefaultUri(cfg);
-
-        String scheme = uri0.getScheme();
-        String authority = uri0.getAuthority();
-
-        if (authority == null) {
-            URI dfltUri = FileSystem.getDefaultUri(cfg);
-
-            if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null))
-                return dfltUri;
-        }
-
-        return uri0;
-    }
-
-    /**
-     * Note that configuration is not a part of the key.
-     * It is used solely to initialize the first instance
-     * that is created for the key.
-     */
-    public static final class FsCacheKey {
-        /** */
-        private final URI uri;
-
-        /** */
-        private final String usr;
-
-        /** */
-        private final String equalityKey;
-
-        /** */
-        private final Configuration cfg;
-
-        /**
-         * Constructor
-         */
-        public FsCacheKey(URI uri, String usr, Configuration cfg) {
-            assert uri != null;
-            assert usr != null;
-            assert cfg != null;
-
-            this.uri = fixUri(uri, cfg);
-            this.usr = usr;
-            this.cfg = cfg;
-
-            this.equalityKey = createEqualityKey();
-        }
-
-        /**
-         * Creates String key used for equality and hashing.
-         */
-        private String createEqualityKey() {
-            GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@");
-
-            if (uri.getScheme() != null)
-                sb.a(uri.getScheme().toLowerCase());
-
-            sb.a("://");
-
-            if (uri.getAuthority() != null)
-                sb.a(uri.getAuthority().toLowerCase());
-
-            return sb.toString();
-        }
-
-        /**
-         * The URI.
-         */
-        public URI uri() {
-            return uri;
-        }
-
-        /**
-         * The User.
-         */
-        public String user() {
-            return usr;
-        }
-
-        /**
-         * The Configuration.
-         */
-        public Configuration configuration() {
-            return cfg;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("SimplifiableIfStatement")
-        @Override public boolean equals(Object obj) {
-            if (obj == this)
-                return true;
-
-            if (obj == null || getClass() != obj.getClass())
-                return false;
-
-            return equalityKey.equals(((FsCacheKey)obj).equalityKey);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return equalityKey.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return equalityKey;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
deleted file mode 100644
index 68c0dc4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FsConstants;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Utilities for configuring file systems to support the separate working directory per each thread.
- */
-public class HadoopFileSystemsUtils {
-    /** Name of the property for setting working directory on create new local FS instance. */
-    public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
-
-    /**
-     * Setup wrappers of filesystems to support the separate working directory.
-     *
-     * @param cfg Config for setup.
-     */
-    public static void setupFileSystems(Configuration cfg) {
-        cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
-        cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
-                HadoopLocalFileSystemV2.class.getName());
-    }
-
-    /**
-     * Gets the property name to disable file system cache.
-     * @param scheme The file system URI scheme.
-     * @return The property name. If scheme is null,
-     * returns "fs.null.impl.disable.cache".
-     */
-    public static String disableFsCachePropertyName(@Nullable String scheme) {
-        return String.format("fs.%s.impl.disable.cache", scheme);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
deleted file mode 100644
index 681cddb..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.jsr166.ConcurrentHashMap8;
-
-/**
- * Maps values by keys.
- * Values are created lazily using {@link ValueFactory}.
- *
- * Despite of the name, does not depend on any Hadoop classes.
- */
-public class HadoopLazyConcurrentMap<K, V extends Closeable> {
-    /** The map storing the actual values. */
-    private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
-
-    /** The factory passed in by the client. Will be used for lazy value creation. */
-    private final ValueFactory<K, V> factory;
-
-    /** Lock used to close the objects. */
-    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
-
-    /** Flag indicating that this map is closed and cleared. */
-    private boolean closed;
-
-    /**
-     * Constructor.
-     * @param factory the factory to create new values lazily.
-     */
-    public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
-        this.factory = factory;
-
-        assert getClass().getClassLoader() == Ignite.class.getClassLoader();
-    }
-
-    /**
-     * Gets cached or creates a new value of V.
-     * Never returns null.
-     * @param k the key to associate the value with.
-     * @return the cached or newly created value, never null.
-     * @throws IgniteException on error
-     */
-    public V getOrCreate(K k) {
-        ValueWrapper w = map.get(k);
-
-        if (w == null) {
-            closeLock.readLock().lock();
-
-            try {
-                if (closed)
-                    throw new IllegalStateException("Failed to create value for key [" + k
-                        + "]: the map is already closed.");
-
-                final ValueWrapper wNew = new ValueWrapper(k);
-
-                w = map.putIfAbsent(k, wNew);
-
-                if (w == null) {
-                    wNew.init();
-
-                    w = wNew;
-                }
-            }
-            finally {
-                closeLock.readLock().unlock();
-            }
-        }
-
-        try {
-            V v = w.getValue();
-
-            assert v != null;
-
-            return v;
-        }
-        catch (IgniteCheckedException ie) {
-            throw new IgniteException(ie);
-        }
-    }
-
-    /**
-     * Clears the map and closes all the values.
-     */
-    public void close() throws IgniteCheckedException {
-        closeLock.writeLock().lock();
-
-        try {
-            if (closed)
-                return;
-
-            closed = true;
-
-            Exception err = null;
-
-            Set<K> keySet = map.keySet();
-
-            for (K key : keySet) {
-                V v = null;
-
-                try {
-                    v = map.get(key).getValue();
-                }
-                catch (IgniteCheckedException ignore) {
-                    // No-op.
-                }
-
-                if (v != null) {
-                    try {
-                        v.close();
-                    }
-                    catch (Exception err0) {
-                        if (err == null)
-                            err = err0;
-                    }
-                }
-            }
-
-            map.clear();
-
-            if (err != null)
-                throw new IgniteCheckedException(err);
-        }
-        finally {
-            closeLock.writeLock().unlock();
-        }
-    }
-
-    /**
-     * Helper class that drives the lazy value creation.
-     */
-    private class ValueWrapper {
-        /** Future. */
-        private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
-
-        /** the key */
-        private final K key;
-
-        /**
-         * Creates new wrapper.
-         */
-        private ValueWrapper(K key) {
-            this.key = key;
-        }
-
-        /**
-         * Initializes the value using the factory.
-         */
-        private void init() {
-            try {
-                final V v0 = factory.createValue(key);
-
-                if (v0 == null)
-                    throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
-
-                fut.onDone(v0);
-            }
-            catch (Throwable e) {
-                fut.onDone(e);
-            }
-        }
-
-        /**
-         * Gets the available value or blocks until the value is initialized.
-         * @return the value, never null.
-         * @throws IgniteCheckedException on error.
-         */
-        V getValue() throws IgniteCheckedException {
-            return fut.get();
-        }
-    }
-
-    /**
-     * Interface representing the factory that creates map values.
-     * @param <K> the type of the key.
-     * @param <V> the type of the value.
-     */
-    public interface ValueFactory <K, V> {
-        /**
-         * Creates the new value. Should never return null.
-         *
-         * @param key the key to create value for
-         * @return the value.
-         * @throws IOException On failure.
-         */
-        public V createValue(K key) throws IOException;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
deleted file mode 100644
index cbb007f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.File;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class HadoopLocalFileSystemV1 extends LocalFileSystem {
-    /**
-     * Creates new local file system.
-     */
-    public HadoopLocalFileSystemV1() {
-        super(new HadoopRawLocalFileSystem());
-    }
-
-    /** {@inheritDoc} */
-    @Override public File pathToFile(Path path) {
-        return ((HadoopRawLocalFileSystem)getRaw()).convert(path);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
deleted file mode 100644
index 2484492..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumFs;
-import org.apache.hadoop.fs.DelegateToFileSystem;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.local.LocalConfigKeys;
-
-import static org.apache.hadoop.fs.FsConstants.LOCAL_FS_URI;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class HadoopLocalFileSystemV2 extends ChecksumFs {
-    /**
-     * Creates new local file system.
-     *
-     * @param cfg Configuration.
-     * @throws IOException If failed.
-     * @throws URISyntaxException If failed.
-     */
-    public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
-        super(new DelegateFS(cfg));
-    }
-
-    /**
-     * Creates new local file system.
-     *
-     * @param uri URI.
-     * @param cfg Configuration.
-     * @throws IOException If failed.
-     * @throws URISyntaxException If failed.
-     */
-    public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
-        this(cfg);
-    }
-
-    /**
-     * Delegate file system.
-     */
-    private static class DelegateFS extends DelegateToFileSystem {
-        /**
-         * Creates new local file system.
-         *
-         * @param cfg Configuration.
-         * @throws IOException If failed.
-         * @throws URISyntaxException If failed.
-         */
-        public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
-            super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getUriDefaultPort() {
-            return -1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public FsServerDefaults getServerDefaults() throws IOException {
-            return LocalConfigKeys.getServerDefaults();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isValidName(String src) {
-            return true;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
deleted file mode 100644
index 0aac4a3..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-/**
- * This class lists parameters that can be specified in Hadoop configuration.
- * Hadoop configuration can be specified in {@code core-site.xml} file
- * or passed to map-reduce task directly when using Hadoop driver for IGFS file system:
- * <ul>
- *     <li>
- *         {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides
- *         the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()}
- *         IGFS data node configuration property.
- *     </li>
- *     <li>
- *         {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If
- *         {@code true}, then all file system operations will be logged to a file.
- *     </li>
- *     <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li>
- *     <li>
- *         {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before
- *         it gets flushed to log file. Higher values will imply greater performance, but will increase delay
- *         before record appears in the log file.
- *     </li>
- *     <li>
- *         {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data
- *         node to which client is connected. If {@code true}, file will not be distributed and will be written
- *         to a single data node. Default value is {@code true}.
- *     </li>
- *     <li>
- *         {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to
- *         local data node if it has enough free space. After some time it can be redistributed across nodes though.
- *     </li>
- * </ul>
- * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in
- * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}.
- * <p>
- * Sample configuration that can be placed to {@code core-site.xml} file:
- * <pre name="code" class="xml">
- *     &lt;property&gt;
- *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.enabled&lt;/name&gt;
- *         &lt;value&gt;true&lt;/value&gt;
- *     &lt;/property&gt;
- *     &lt;property&gt;
- *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.dir&lt;/name&gt;
- *         &lt;value&gt;/home/apache/ignite/log/sampling&lt;/value&gt;
- *     &lt;/property&gt;
- *     &lt;property&gt;
- *         &lt;name&gt;fs.igfs.127.0.0.1:10500.log.batch_size&lt;/name&gt;
- *         &lt;value&gt;16&lt;/value&gt;
- *     &lt;/property&gt;
- * </pre>
- * Parameters could also be specified per mapreduce job, e.g.
- * <pre name="code" class="bash">
- * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
- * </pre>
- * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest
- * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}.
- */
-public class HadoopParameters {
-    /** Parameter name for control over file colocation write mode. */
-    public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes";
-
-    /** Parameter name for custom sequential reads before prefetch value. */
-    public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH =
-        "fs.igfs.%s.open.sequential_reads_before_prefetch";
-
-    /** Parameter name for client logger directory. */
-    public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir";
-
-    /** Parameter name for log batch size. */
-    public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size";
-
-    /** Parameter name for log enabled flag. */
-    public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled";
-
-    /** Parameter name for prefer local writes flag. */
-    public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes";
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
deleted file mode 100644
index b8fc8e7..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
-import java.net.URI;
-import java.nio.file.Files;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsConstants;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Local file system implementation for Hadoop.
- */
-public class HadoopRawLocalFileSystem extends FileSystem {
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
-        @Override protected Path initialValue() {
-            return getInitialWorkingDirectory();
-        }
-    };
-
-    /**
-     * Converts Hadoop path to local path.
-     *
-     * @param path Hadoop path.
-     * @return Local path.
-     */
-    File convert(Path path) {
-        checkPath(path);
-
-        if (path.isAbsolute())
-            return new File(path.toUri().getPath());
-
-        return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        return makeQualified(new Path(System.getProperty("user.home")));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getInitialWorkingDirectory() {
-        File f = new File(System.getProperty("user.dir"));
-
-        return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setConf(conf);
-
-        String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
-
-        if (initWorkDir != null)
-            setWorkingDirectory(new Path(initWorkDir));
-    }
-
-    /** {@inheritDoc} */
-    @Override public URI getUri() {
-        return FsConstants.LOCAL_FS_URI;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-        return new FSDataInputStream(new InStream(checkExists(convert(f))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
-        short replication, long blockSize, Progressable progress) throws IOException {
-        File file = convert(f);
-
-        if (!overwrite && !file.createNewFile())
-            throw new IOException("Failed to create new file: " + f.toUri());
-
-        return out(file, false, bufSize);
-    }
-
-    /**
-     * @param file File.
-     * @param append Append flag.
-     * @return Output stream.
-     * @throws IOException If failed.
-     */
-    private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
-        return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
-            bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
-        return out(convert(f), true, bufSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean rename(Path src, Path dst) throws IOException {
-        return convert(src).renameTo(convert(dst));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(Path f, boolean recursive) throws IOException {
-        File file = convert(f);
-
-        if (file.isDirectory() && !recursive)
-            throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
-
-        return U.delete(file);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        workDir.set(fixRelativePart(dir));
-
-        checkPath(dir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workDir.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-        if(f == null)
-            throw new IllegalArgumentException("mkdirs path arg is null");
-
-        Path parent = f.getParent();
-
-        File p2f = convert(f);
-
-        if(parent != null) {
-            File parent2f = convert(parent);
-
-            if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
-                throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
-
-        }
-
-        return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileStatus(Path f) throws IOException {
-        return fileStatus(checkExists(convert(f)));
-    }
-
-    /**
-     * @return File status.
-     */
-    private FileStatus fileStatus(File file) throws IOException {
-        boolean dir = file.isDirectory();
-
-        java.nio.file.Path path = dir ? null : file.toPath();
-
-        return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
-            /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
-            new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
-    }
-
-    /**
-     * @param file File.
-     * @return Same file.
-     * @throws FileNotFoundException If does not exist.
-     */
-    private static File checkExists(File file) throws FileNotFoundException {
-        if (!file.exists())
-            throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
-
-        return file;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus[] listStatus(Path f) throws IOException {
-        File file = convert(f);
-
-        if (checkExists(file).isFile())
-            return new FileStatus[] {fileStatus(file)};
-
-        File[] files = file.listFiles();
-
-        FileStatus[] res = new FileStatus[files.length];
-
-        for (int i = 0; i < res.length; i++)
-            res[i] = fileStatus(files[i]);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean supportsSymlinks() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
-        Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
-        return getFileStatus(getLinkTarget(f));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getLinkTarget(Path f) throws IOException {
-        File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
-
-        return new Path(file.toURI());
-    }
-
-    /**
-     * Input stream.
-     */
-    private static class InStream extends InputStream implements Seekable, PositionedReadable {
-        /** */
-        private final RandomAccessFile file;
-
-        /**
-         * @param f File.
-         * @throws IOException If failed.
-         */
-        public InStream(File f) throws IOException {
-            file = new RandomAccessFile(f, "r");
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read() throws IOException {
-            return file.read();
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
-            return file.read(b, off, len);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void close() throws IOException {
-            file.close();
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
-            long pos0 = file.getFilePointer();
-
-            file.seek(pos);
-            int res = file.read(buf, off, len);
-
-            file.seek(pos0);
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
-            if (read(pos, buf, off, len) != len)
-                throw new IOException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readFully(long pos, byte[] buf) throws IOException {
-            readFully(pos, buf, 0, buf.length);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void seek(long pos) throws IOException {
-            file.seek(pos);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized long getPos() throws IOException {
-            return file.getFilePointer();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean seekToNewSource(long targetPos) throws IOException {
-            return false;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
deleted file mode 100644
index fe43596..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Facade for communication with grid.
- */
-public interface HadoopIgfs {
-    /**
-     * Perform handshake.
-     *
-     * @param logDir Log directory.
-     * @return Future with handshake result.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException;
-
-    /**
-     * Close connection.
-     *
-     * @param force Force flag.
-     */
-    public void close(boolean force);
-
-    /**
-     * Command to retrieve file info for some IGFS path.
-     *
-     * @param path Path to get file info for.
-     * @return Future for info operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to update file properties.
-     *
-     * @param path IGFS path to update properties.
-     * @param props Properties to update.
-     * @return Future for update operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Sets last access time and last modification time for a file.
-     *
-     * @param path Path to update times.
-     * @param accessTime Last access time to set.
-     * @param modificationTime Last modification time to set.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Command to rename given path.
-     *
-     * @param src Source path.
-     * @param dest Destination path.
-     * @return Future for rename operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to delete given path.
-     *
-     * @param path Path to delete.
-     * @param recursive {@code True} if deletion is recursive.
-     * @return Future for delete operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get affinity for given path, offset and length.
-     *
-     * @param path Path to get affinity for.
-     * @param start Start position (offset).
-     * @param len Data length.
-     * @return Future for affinity command.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Gets path summary.
-     *
-     * @param path Path to get summary for.
-     * @return Future that will be completed when summary is received.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to create directories.
-     *
-     * @param path Path to create.
-     * @return Future for mkdirs operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get list of files in directory.
-     *
-     * @param path Path to list.
-     * @return Future for listFiles operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to get directory listing.
-     *
-     * @param path Path to list.
-     * @return Future for listPaths operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Performs status request.
-     *
-     * @return Status response.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to open file for reading.
-     *
-     * @param path File path to open.
-     * @return Future for open operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException;
-
-    /**
-     * Command to open file for reading.
-     *
-     * @param path File path to open.
-     * @return Future for open operation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException,
-        IOException;
-
-    /**
-     * Command to create file and open it for output.
-     *
-     * @param path Path to file.
-     * @param overwrite If {@code true} then old file contents will be lost.
-     * @param colocate If {@code true} and called on data node, file will be written on that node.
-     * @param replication Replication factor.
-     * @param props File properties for creation.
-     * @return Stream descriptor.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
-
-    /**
-     * Open file for output appending data to the end of a file.
-     *
-     * @param path Path to file.
-     * @param create If {@code true}, file will be created if does not exist.
-     * @param props File properties.
-     * @return Stream descriptor.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
deleted file mode 100644
index d610091..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import org.apache.ignite.IgniteCheckedException;
-
-/**
- * Communication exception indicating a problem between file system and IGFS instance.
- */
-public class HadoopIgfsCommunicationException extends IgniteCheckedException {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * Creates new exception with given throwable as a nested cause and
-     * source of error message.
-     *
-     * @param cause Non-null throwable cause.
-     */
-    public HadoopIgfsCommunicationException(Exception cause) {
-        super(cause);
-    }
-
-    /**
-     * Creates a new exception with given error message and optional nested cause exception.
-     *
-     * @param msg Error message.
-     */
-    public HadoopIgfsCommunicationException(String msg) {
-        super(msg);
-    }
-
-    /**
-     * Creates a new exception with given error message and optional nested cause exception.
-     *
-     * @param msg Error message.
-     * @param cause Cause.
-     */
-    public HadoopIgfsCommunicationException(String msg, Exception cause) {
-        super(msg, cause);
-    }
-}
\ No newline at end of file