You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/03/05 10:05:31 UTC
[31/58] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed
changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
new file mode 100644
index 0000000..351839a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Counter for the job statistics accumulation.
+ */
+public class HadoopPerformanceCounter extends HadoopCounterAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** The group name for this counter. */
+ private static final String GROUP_NAME = "SYSTEM";
+
+ /** The counter name for this counter. */
+ private static final String COUNTER_NAME = "PERFORMANCE";
+
+ /** Events collections. */
+ private Collection<T2<String,Long>> evts = new ArrayList<>();
+
+ /** Node id to insert into the event info. */
+ private UUID nodeId;
+
+ /** */
+ private int reducerNum;
+
+ /** */
+ private volatile Long firstShuffleMsg;
+
+ /** */
+ private volatile Long lastShuffleMsg;
+
+ /**
+ * Default constructor required by {@link Externalizable}.
+ */
+ public HadoopPerformanceCounter() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param grp Group name.
+ * @param name Counter name.
+ */
+ public HadoopPerformanceCounter(String grp, String name) {
+ super(grp, name);
+ }
+
+ /**
+ * Constructor to create instance to use this as helper.
+ *
+ * @param nodeId Id of the work node.
+ */
+ public HadoopPerformanceCounter(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeValue(ObjectOutput out) throws IOException {
+ U.writeCollection(out, evts);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void readValue(ObjectInput in) throws IOException {
+ try {
+ evts = U.readCollection(in);
+ }
+ catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void merge(HadoopCounter cntr) {
+ evts.addAll(((HadoopPerformanceCounter)cntr).evts);
+ }
+
+ /**
+ * Gets the events collection.
+ *
+ * @return Collection of event.
+ */
+ public Collection<T2<String, Long>> evts() {
+ return evts;
+ }
+
+ /**
+ * Generate name that consists of some event information.
+ *
+ * @param info Task info.
+ * @param evtType The type of the event.
+ * @return String contains necessary event information.
+ */
+ private String eventName(HadoopTaskInfo info, String evtType) {
+ return eventName(info.type().toString(), info.taskNumber(), evtType);
+ }
+
+ /**
+ * Generate name that consists of some event information.
+ *
+ * @param taskType Task type.
+ * @param taskNum Number of the task.
+ * @param evtType The type of the event.
+ * @return String contains necessary event information.
+ */
+ private String eventName(String taskType, int taskNum, String evtType) {
+ assert nodeId != null;
+
+ return taskType + " " + taskNum + " " + evtType + " " + nodeId;
+ }
+
+ /**
+ * Adds event of the task submission (task instance creation).
+ *
+ * @param info Task info.
+ * @param ts Timestamp of the event.
+ */
+ public void onTaskSubmit(HadoopTaskInfo info, long ts) {
+ evts.add(new T2<>(eventName(info, "submit"), ts));
+ }
+
+ /**
+ * Adds event of the task preparation.
+ *
+ * @param info Task info.
+ * @param ts Timestamp of the event.
+ */
+ public void onTaskPrepare(HadoopTaskInfo info, long ts) {
+ evts.add(new T2<>(eventName(info, "prepare"), ts));
+ }
+
+ /**
+ * Adds event of the task finish.
+ *
+ * @param info Task info.
+ * @param ts Timestamp of the event.
+ */
+ public void onTaskFinish(HadoopTaskInfo info, long ts) {
+ if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) {
+ evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
+ evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
+
+ lastShuffleMsg = null;
+ }
+
+ evts.add(new T2<>(eventName(info, "finish"), ts));
+ }
+
+ /**
+ * Adds event of the task run.
+ *
+ * @param info Task info.
+ * @param ts Timestamp of the event.
+ */
+ public void onTaskStart(HadoopTaskInfo info, long ts) {
+ evts.add(new T2<>(eventName(info, "start"), ts));
+ }
+
+ /**
+ * Adds event of the job preparation.
+ *
+ * @param ts Timestamp of the event.
+ */
+ public void onJobPrepare(long ts) {
+ assert nodeId != null;
+
+ evts.add(new T2<>("JOB prepare " + nodeId, ts));
+ }
+
+ /**
+ * Adds event of the job start.
+ *
+ * @param ts Timestamp of the event.
+ */
+ public void onJobStart(long ts) {
+ assert nodeId != null;
+
+ evts.add(new T2<>("JOB start " + nodeId, ts));
+ }
+
+ /**
+ * Adds client submission events from job info.
+ *
+ * @param info Job info.
+ */
+ public void clientSubmissionEvents(HadoopJobInfo info) {
+ assert nodeId != null;
+
+ addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
+ addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
+ addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
+ }
+
+ /**
+ * Adds event with timestamp from some property in job info.
+ *
+ * @param evt Event type and phase.
+ * @param info Job info.
+ * @param propName Property name to get timestamp.
+ */
+ private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) {
+ String val = info.property(propName);
+
+ if (!F.isEmpty(val)) {
+ try {
+ evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
+ }
+ }
+ }
+
+ /**
+ * Registers shuffle message event.
+ *
+ * @param reducerNum Number of reducer that receives the data.
+ * @param ts Timestamp of the event.
+ */
+ public void onShuffleMessage(int reducerNum, long ts) {
+ this.reducerNum = reducerNum;
+
+ if (firstShuffleMsg == null)
+ firstShuffleMsg = ts;
+
+ lastShuffleMsg = ts;
+ }
+
+ /**
+ * Gets system predefined performance counter from the HadoopCounters object.
+ *
+ * @param cntrs HadoopCounters object.
+ * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
+ * @return Predefined performance counter.
+ */
+ public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) {
+ HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+
+ if (nodeId != null)
+ cntr.nodeId(nodeId);
+
+ return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
+ }
+
+ /**
+ * Sets the nodeId field.
+ *
+ * @param nodeId Node id.
+ */
+ private void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
deleted file mode 100644
index e9461e2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.IgfsConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class GridHadoopDistributedFileSystem extends DistributedFileSystem {
- /** User name for each thread. */
- private final ThreadLocal<String> userName = new ThreadLocal<String>() {
- /** {@inheritDoc} */
- @Override protected String initialValue() {
- return DFLT_USER_NAME;
- }
- };
-
- /** Working directory for each thread. */
- private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
- /** {@inheritDoc} */
- @Override protected Path initialValue() {
- return getHomeDirectory();
- }
- };
-
- /** {@inheritDoc} */
- @Override public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
-
- setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
- }
-
- /**
- * Set user name and default working directory for current thread.
- *
- * @param userName User name.
- */
- public void setUser(String userName) {
- this.userName.set(userName);
-
- setWorkingDirectory(getHomeDirectory());
- }
-
- /** {@inheritDoc} */
- @Override public Path getHomeDirectory() {
- Path path = new Path("/user/" + userName.get());
-
- return path.makeQualified(getUri(), null);
- }
-
- /** {@inheritDoc} */
- @Override public void setWorkingDirectory(Path dir) {
- Path fixedDir = fixRelativePart(dir);
-
- String res = fixedDir.toUri().getPath();
-
- if (!DFSUtil.isValidName(res))
- throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
- workingDir.set(fixedDir);
- }
-
- /** {@inheritDoc} */
- @Override public Path getWorkingDirectory() {
- return workingDir.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
deleted file mode 100644
index 52e7d29..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.igfs.hadoop.v1.*;
-
-/**
- * Utilities for configuring file systems to support the separate working directory per each thread.
- */
-public class GridHadoopFileSystemsUtils {
- /** Name of the property for setting working directory on create new local FS instance. */
- public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
-
- /**
- * Set user name and default working directory for current thread if it's supported by file system.
- *
- * @param fs File system.
- * @param userName User name.
- */
- public static void setUser(FileSystem fs, String userName) {
- if (fs instanceof IgfsHadoopFileSystem)
- ((IgfsHadoopFileSystem)fs).setUser(userName);
- else if (fs instanceof GridHadoopDistributedFileSystem)
- ((GridHadoopDistributedFileSystem)fs).setUser(userName);
- }
-
- /**
- * Setup wrappers of filesystems to support the separate working directory.
- *
- * @param cfg Config for setup.
- */
- public static void setupFileSystems(Configuration cfg) {
- cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", GridHadoopLocalFileSystemV1.class.getName());
- cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
- GridHadoopLocalFileSystemV2.class.getName());
-
- cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", GridHadoopDistributedFileSystem.class.getName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
deleted file mode 100644
index 28834d4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.fs.*;
-
-import java.io.*;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class GridHadoopLocalFileSystemV1 extends LocalFileSystem {
- /**
- * Creates new local file system.
- */
- public GridHadoopLocalFileSystemV1() {
- super(new GridHadoopRawLocalFileSystem());
- }
-
- /** {@inheritDoc} */
- @Override public File pathToFile(Path path) {
- return ((GridHadoopRawLocalFileSystem)getRaw()).convert(path);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
deleted file mode 100644
index 62d7cea..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.local.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.hadoop.fs.FsConstants.*;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class GridHadoopLocalFileSystemV2 extends ChecksumFs {
- /**
- * Creates new local file system.
- *
- * @param cfg Configuration.
- * @throws IOException If failed.
- * @throws URISyntaxException If failed.
- */
- public GridHadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
- super(new DelegateFS(cfg));
- }
-
- /**
- * Creates new local file system.
- *
- * @param uri URI.
- * @param cfg Configuration.
- * @throws IOException If failed.
- * @throws URISyntaxException If failed.
- */
- public GridHadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
- this(cfg);
- }
-
- /**
- * Delegate file system.
- */
- private static class DelegateFS extends DelegateToFileSystem {
- /**
- * Creates new local file system.
- *
- * @param cfg Configuration.
- * @throws IOException If failed.
- * @throws URISyntaxException If failed.
- */
- public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
- super(LOCAL_FS_URI, new GridHadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
- }
-
- /** {@inheritDoc} */
- @Override public int getUriDefaultPort() {
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public FsServerDefaults getServerDefaults() throws IOException {
- return LocalConfigKeys.getServerDefaults();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isValidName(String src) {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
deleted file mode 100644
index 29645f8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.file.*;
-
-/**
- * Local file system implementation for Hadoop.
- */
-public class GridHadoopRawLocalFileSystem extends FileSystem {
- /** Working directory for each thread. */
- private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
- @Override protected Path initialValue() {
- return getInitialWorkingDirectory();
- }
- };
-
- /**
- * Converts Hadoop path to local path.
- *
- * @param path Hadoop path.
- * @return Local path.
- */
- File convert(Path path) {
- checkPath(path);
-
- if (path.isAbsolute())
- return new File(path.toUri().getPath());
-
- return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
- }
-
- /** {@inheritDoc} */
- @Override public Path getHomeDirectory() {
- return makeQualified(new Path(System.getProperty("user.home")));
- }
-
- /** {@inheritDoc} */
- @Override public Path getInitialWorkingDirectory() {
- File f = new File(System.getProperty("user.dir"));
-
- return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
- }
-
- /** {@inheritDoc} */
- @Override public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
-
- setConf(conf);
-
- String initWorkDir = conf.get(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
-
- if (initWorkDir != null)
- setWorkingDirectory(new Path(initWorkDir));
- }
-
- /** {@inheritDoc} */
- @Override public URI getUri() {
- return FsConstants.LOCAL_FS_URI;
- }
-
- /** {@inheritDoc} */
- @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- return new FSDataInputStream(new InStream(checkExists(convert(f))));
- }
-
- /** {@inheritDoc} */
- @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
- short replication, long blockSize, Progressable progress) throws IOException {
- File file = convert(f);
-
- if (!overwrite && !file.createNewFile())
- throw new IOException("Failed to create new file: " + f.toUri());
-
- return out(file, false, bufSize);
- }
-
- /**
- * @param file File.
- * @param append Append flag.
- * @return Output stream.
- * @throws IOException If failed.
- */
- private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
- return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
- bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
- }
-
- /** {@inheritDoc} */
- @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
- return out(convert(f), true, bufSize);
- }
-
- /** {@inheritDoc} */
- @Override public boolean rename(Path src, Path dst) throws IOException {
- return convert(src).renameTo(convert(dst));
- }
-
- /** {@inheritDoc} */
- @Override public boolean delete(Path f, boolean recursive) throws IOException {
- File file = convert(f);
-
- if (file.isDirectory() && !recursive)
- throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
-
- return U.delete(file);
- }
-
- /** {@inheritDoc} */
- @Override public void setWorkingDirectory(Path dir) {
- workDir.set(fixRelativePart(dir));
-
- checkPath(dir);
- }
-
- /** {@inheritDoc} */
- @Override public Path getWorkingDirectory() {
- return workDir.get();
- }
-
- /** {@inheritDoc} */
- @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
- if(f == null)
- throw new IllegalArgumentException("mkdirs path arg is null");
-
- Path parent = f.getParent();
-
- File p2f = convert(f);
-
- if(parent != null) {
- File parent2f = convert(parent);
-
- if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
- throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
-
- }
-
- return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
- }
-
- /** {@inheritDoc} */
- @Override public FileStatus getFileStatus(Path f) throws IOException {
- return fileStatus(checkExists(convert(f)));
- }
-
- /**
- * @return File status.
- */
- private FileStatus fileStatus(File file) throws IOException {
- boolean dir = file.isDirectory();
-
- java.nio.file.Path path = dir ? null : file.toPath();
-
- return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
- /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
- new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
- }
-
- /**
- * @param file File.
- * @return Same file.
- * @throws FileNotFoundException If does not exist.
- */
- private static File checkExists(File file) throws FileNotFoundException {
- if (!file.exists())
- throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
-
- return file;
- }
-
- /** {@inheritDoc} */
- @Override public FileStatus[] listStatus(Path f) throws IOException {
- File file = convert(f);
-
- if (checkExists(file).isFile())
- return new FileStatus[] {fileStatus(file)};
-
- File[] files = file.listFiles();
-
- FileStatus[] res = new FileStatus[files.length];
-
- for (int i = 0; i < res.length; i++)
- res[i] = fileStatus(files[i]);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean supportsSymlinks() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
- Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
- }
-
- /** {@inheritDoc} */
- @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
- return getFileStatus(getLinkTarget(f));
- }
-
- /** {@inheritDoc} */
- @Override public Path getLinkTarget(Path f) throws IOException {
- File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
-
- return new Path(file.toURI());
- }
-
- /**
- * Input stream.
- */
- private static class InStream extends InputStream implements Seekable, PositionedReadable {
- /** */
- private final RandomAccessFile file;
-
- /**
- * @param f File.
- * @throws IOException If failed.
- */
- public InStream(File f) throws IOException {
- file = new RandomAccessFile(f, "r");
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read() throws IOException {
- return file.read();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
- return file.read(b, off, len);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void close() throws IOException {
- file.close();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
- long pos0 = file.getFilePointer();
-
- file.seek(pos);
- int res = file.read(buf, off, len);
-
- file.seek(pos0);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
- if (read(pos, buf, off, len) != len)
- throw new IOException();
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long pos, byte[] buf) throws IOException {
- readFully(pos, buf, 0, buf.length);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void seek(long pos) throws IOException {
- file.seek(pos);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized long getPos() throws IOException {
- return file.getFilePointer();
- }
-
- /** {@inheritDoc} */
- @Override public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
new file mode 100644
index 0000000..509f443
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+
+/**
+ * Wrapper of HDFS for support of separated working directory.
+ */
+public class HadoopDistributedFileSystem extends DistributedFileSystem {
+ /** User name for each thread. */
+ private final ThreadLocal<String> userName = new ThreadLocal<String>() {
+ /** {@inheritDoc} */
+ @Override protected String initialValue() {
+ return DFLT_USER_NAME;
+ }
+ };
+
+ /** Working directory for each thread. */
+ private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
+ /** {@inheritDoc} */
+ @Override protected Path initialValue() {
+ return getHomeDirectory();
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+
+ setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+ }
+
+ /**
+ * Set user name and default working directory for current thread.
+ *
+ * @param userName User name.
+ */
+ public void setUser(String userName) {
+ this.userName.set(userName);
+
+ setWorkingDirectory(getHomeDirectory());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getHomeDirectory() {
+ Path path = new Path("/user/" + userName.get());
+
+ return path.makeQualified(getUri(), null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWorkingDirectory(Path dir) {
+ Path fixedDir = fixRelativePart(dir);
+
+ String res = fixedDir.toUri().getPath();
+
+ if (!DFSUtil.isValidName(res))
+ throw new IllegalArgumentException("Invalid DFS directory name " + res);
+
+ workingDir.set(fixedDir);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getWorkingDirectory() {
+ return workingDir.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
new file mode 100644
index 0000000..f3f51d4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+
+/**
+ * Utilities for configuring file systems to support the separate working directory per each thread.
+ */
+public class HadoopFileSystemsUtils {
+ /** Name of the property for setting working directory on create new local FS instance. */
+ public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
+
+ /**
+ * Set user name and default working directory for current thread if it's supported by file system.
+ *
+ * @param fs File system.
+ * @param userName User name.
+ */
+ public static void setUser(FileSystem fs, String userName) {
+ if (fs instanceof IgniteHadoopFileSystem)
+ ((IgniteHadoopFileSystem)fs).setUser(userName);
+ else if (fs instanceof HadoopDistributedFileSystem)
+ ((HadoopDistributedFileSystem)fs).setUser(userName);
+ }
+
+ /**
+ * Setup wrappers of filesystems to support the separate working directory.
+ *
+ * @param cfg Config for setup.
+ */
+ public static void setupFileSystems(Configuration cfg) {
+ cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
+ cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
+ HadoopLocalFileSystemV2.class.getName());
+
+ cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
new file mode 100644
index 0000000..9cc5881
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.*;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV1 extends LocalFileSystem {
+ /**
+ * Creates new local file system.
+ */
+ public HadoopLocalFileSystemV1() {
+ super(new HadoopRawLocalFileSystem());
+ }
+
+ /** {@inheritDoc} */
+ @Override public File pathToFile(Path path) {
+ return ((HadoopRawLocalFileSystem)getRaw()).convert(path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
new file mode 100644
index 0000000..15ddc5a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.local.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.hadoop.fs.FsConstants.*;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV2 extends ChecksumFs {
+ /**
+ * Creates new local file system.
+ *
+ * @param cfg Configuration.
+ * @throws IOException If failed.
+ * @throws URISyntaxException If failed.
+ */
+ public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
+ super(new DelegateFS(cfg));
+ }
+
+ /**
+ * Creates new local file system.
+ *
+ * @param uri URI.
+ * @param cfg Configuration.
+ * @throws IOException If failed.
+ * @throws URISyntaxException If failed.
+ */
+ public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
+ this(cfg);
+ }
+
+ /**
+ * Delegate file system.
+ */
+ private static class DelegateFS extends DelegateToFileSystem {
+ /**
+ * Creates new local file system.
+ *
+ * @param cfg Configuration.
+ * @throws IOException If failed.
+ * @throws URISyntaxException If failed.
+ */
+ public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
+ super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getUriDefaultPort() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FsServerDefaults getServerDefaults() throws IOException {
+ return LocalConfigKeys.getServerDefaults();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isValidName(String src) {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
new file mode 100644
index 0000000..7edcec0
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+/**
+ * This class lists parameters that can be specified in Hadoop configuration.
+ * Hadoop configuration can be specified in {@code core-site.xml} file
+ * or passed to map-reduce task directly when using Hadoop driver for IGFS file system:
+ * <ul>
+ * <li>
+ * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides
+ * the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()}
+ * IGFS data node configuration property.
+ * </li>
+ * <li>
+ * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If
+ * {@code true}, then all file system operations will be logged to a file.
+ * </li>
+ * <li>{@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.</li>
+ * <li>
+ * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before
+ * it gets flushed to log file. Higher values will imply greater performance, but will increase delay
+ * before record appears in the log file.
+ * </li>
+ * <li>
+ * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data
+ * node to which client is connected. If {@code true}, file will not be distributed and will be written
+ * to a single data node. Default value is {@code true}.
+ * </li>
+ * <li>
+ * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to
+ * local data node if it has enough free space. After some time it can be redistributed across nodes though.
+ * </li>
+ * </ul>
+ * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in
+ * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}.
+ * <p>
+ * Sample configuration that can be placed to {@code core-site.xml} file:
+ * <pre name="code" class="xml">
+ * <property>
+ * <name>fs.igfs.127.0.0.1:10500.log.enabled</name>
+ * <value>true</value>
+ * </property>
+ * <property>
+ * <name>fs.igfs.127.0.0.1:10500.log.dir</name>
+ * <value>/home/apache/ignite/log/sampling</value>
+ * </property>
+ * <property>
+ * <name>fs.igfs.127.0.0.1:10500.log.batch_size</name>
+ * <value>16</value>
+ * </property>
+ * </pre>
+ * Parameters could also be specified per mapreduce job, e.g.
+ * <pre name="code" class="bash">
+ * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
+ * </pre>
+ * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest
+ * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}.
+ */
+public class HadoopParameters {
+ /** Parameter name for control over file colocation write mode. */
+ public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes";
+
+ /** Parameter name for custom sequential reads before prefetch value. */
+ public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH =
+ "fs.igfs.%s.open.sequential_reads_before_prefetch";
+
+ /** Parameter name for client logger directory. */
+ public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir";
+
+ /** Parameter name for log batch size. */
+ public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size";
+
+ /** Parameter name for log enabled flag. */
+ public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled";
+
+ /** Parameter name for prefer local writes flag. */
+ public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes";
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
new file mode 100644
index 0000000..e5ec3f7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+
+/**
+ * Local file system implementation for Hadoop.
+ */
+public class HadoopRawLocalFileSystem extends FileSystem {
+ /** Working directory for each thread. */
+ private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
+ @Override protected Path initialValue() {
+ return getInitialWorkingDirectory();
+ }
+ };
+
+ /**
+ * Converts Hadoop path to local path.
+ *
+ * @param path Hadoop path.
+ * @return Local path.
+ */
+ File convert(Path path) {
+ checkPath(path);
+
+ if (path.isAbsolute())
+ return new File(path.toUri().getPath());
+
+ return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getHomeDirectory() {
+ return makeQualified(new Path(System.getProperty("user.home")));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getInitialWorkingDirectory() {
+ File f = new File(System.getProperty("user.dir"));
+
+ return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+
+ setConf(conf);
+
+ String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
+
+ if (initWorkDir != null)
+ setWorkingDirectory(new Path(initWorkDir));
+ }
+
+ /** {@inheritDoc} */
+ @Override public URI getUri() {
+ return FsConstants.LOCAL_FS_URI;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return new FSDataInputStream(new InStream(checkExists(convert(f))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ File file = convert(f);
+
+ if (!overwrite && !file.createNewFile())
+ throw new IOException("Failed to create new file: " + f.toUri());
+
+ return out(file, false, bufSize);
+ }
+
+ /**
+ * @param file File.
+ * @param append Append flag.
+ * @return Output stream.
+ * @throws IOException If failed.
+ */
+ private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
+ return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
+ bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
+ return out(convert(f), true, bufSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rename(Path src, Path dst) throws IOException {
+ return convert(src).renameTo(convert(dst));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(Path f, boolean recursive) throws IOException {
+ File file = convert(f);
+
+ if (file.isDirectory() && !recursive)
+ throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
+
+ return U.delete(file);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWorkingDirectory(Path dir) {
+ workDir.set(fixRelativePart(dir));
+
+ checkPath(dir);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getWorkingDirectory() {
+ return workDir.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ if(f == null)
+ throw new IllegalArgumentException("mkdirs path arg is null");
+
+ Path parent = f.getParent();
+
+ File p2f = convert(f);
+
+ if(parent != null) {
+ File parent2f = convert(parent);
+
+ if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
+ throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
+
+ }
+
+ return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileStatus(Path f) throws IOException {
+ return fileStatus(checkExists(convert(f)));
+ }
+
+ /**
+ * @return File status.
+ */
+ private FileStatus fileStatus(File file) throws IOException {
+ boolean dir = file.isDirectory();
+
+ java.nio.file.Path path = dir ? null : file.toPath();
+
+ return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
+ /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
+ new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
+ }
+
+ /**
+ * @param file File.
+ * @return Same file.
+ * @throws FileNotFoundException If does not exist.
+ */
+ private static File checkExists(File file) throws FileNotFoundException {
+ if (!file.exists())
+ throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
+
+ return file;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus[] listStatus(Path f) throws IOException {
+ File file = convert(f);
+
+ if (checkExists(file).isFile())
+ return new FileStatus[] {fileStatus(file)};
+
+ File[] files = file.listFiles();
+
+ FileStatus[] res = new FileStatus[files.length];
+
+ for (int i = 0; i < res.length; i++)
+ res[i] = fileStatus(files[i]);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean supportsSymlinks() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
+ Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
+ return getFileStatus(getLinkTarget(f));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getLinkTarget(Path f) throws IOException {
+ File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
+
+ return new Path(file.toURI());
+ }
+
+ /**
+ * Input stream.
+ */
+ private static class InStream extends InputStream implements Seekable, PositionedReadable {
+ /** */
+ private final RandomAccessFile file;
+
+ /**
+ * @param f File.
+ * @throws IOException If failed.
+ */
+ public InStream(File f) throws IOException {
+ file = new RandomAccessFile(f, "r");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read() throws IOException {
+ return file.read();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
+ return file.read(b, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() throws IOException {
+ file.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+ long pos0 = file.getFilePointer();
+
+ file.seek(pos);
+ int res = file.read(buf, off, len);
+
+ file.seek(pos0);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+ if (read(pos, buf, off, len) != len)
+ throw new IOException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf) throws IOException {
+ readFully(pos, buf, 0, buf.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void seek(long pos) throws IOException {
+ file.seek(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized long getPos() throws IOException {
+ return file.getFilePointer();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
new file mode 100644
index 0000000..b3cb235
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Facade for communication with grid.
+ */
+public interface HadoopIgfs {
+ /**
+ * Perform handshake.
+ *
+ * @param logDir Log directory.
+ * @return Future with handshake result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException;
+
+ /**
+ * Close connection.
+ *
+ * @param force Force flag.
+ */
+ public void close(boolean force);
+
+ /**
+ * Command to retrieve file info for some IGFS path.
+ *
+ * @param path Path to get file info for.
+ * @return Future for info operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to update file properties.
+ *
+ * @param path IGFS path to update properties.
+ * @param props Properties to update.
+ * @return Future for update operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
+
+ /**
+ * Sets last access time and last modification time for a file.
+ *
+ * @param path Path to update times.
+ * @param accessTime Last access time to set.
+ * @param modificationTime Last modification time to set.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException,
+ IOException;
+
+ /**
+ * Command to rename given path.
+ *
+ * @param src Source path.
+ * @param dest Destination path.
+ * @return Future for rename operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to delete given path.
+ *
+ * @param path Path to delete.
+ * @param recursive {@code True} if deletion is recursive.
+ * @return Future for delete operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to get affinity for given path, offset and length.
+ *
+ * @param path Path to get affinity for.
+ * @param start Start position (offset).
+ * @param len Data length.
+ * @return Future for affinity command.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) throws IgniteCheckedException,
+ IOException;
+
+ /**
+ * Gets path summary.
+ *
+ * @param path Path to get summary for.
+ * @return Future that will be completed when summary is received.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to create directories.
+ *
+ * @param path Path to create.
+ * @return Future for mkdirs operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to get list of files in directory.
+ *
+ * @param path Path to list.
+ * @return Future for listFiles operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to get directory listing.
+ *
+ * @param path Path to list.
+ * @return Future for listPaths operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Performs status request.
+ *
+ * @return Status response.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to open file for reading.
+ *
+ * @param path File path to open.
+ * @return Future for open operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException;
+
+ /**
+ * Command to open file for reading.
+ *
+ * @param path File path to open.
+ * @return Future for open operation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException,
+ IOException;
+
+ /**
+ * Command to create file and open it for output.
+ *
+ * @param path Path to file.
+ * @param overwrite If {@code true} then old file contents will be lost.
+ * @param colocate If {@code true} and called on data node, file will be written on that node.
+ * @param replication Replication factor.
+ * @param props File properties for creation.
+ * @return Stream descriptor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
+ int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
+
+ /**
+ * Open file for output appending data to the end of a file.
+ *
+ * @param path Path to file.
+ * @param create If {@code true}, file will be created if does not exist.
+ * @param props File properties.
+ * @return Stream descriptor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
+ @Nullable Map<String, String> props) throws IgniteCheckedException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
new file mode 100644
index 0000000..ff69478
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.*;
+
+/**
+ * Communication exception indicating a problem between file system and IGFS instance.
+ */
+public class HadoopIgfsCommunicationException extends IgniteCheckedException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Creates new exception with given throwable as a nested cause and
+ * source of error message.
+ *
+ * @param cause Non-null throwable cause.
+ */
+ public HadoopIgfsCommunicationException(Exception cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a new exception with given error message and optional nested cause exception.
+ *
+ * @param msg Error message.
+ */
+ public HadoopIgfsCommunicationException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates a new exception with given error message and optional nested cause exception.
+ *
+ * @param msg Error message.
+ * @param cause Cause.
+ */
+ public HadoopIgfsCommunicationException(String msg, Exception cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
new file mode 100644
index 0000000..7502f57
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEndpoint.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+
+/**
+ * IGFS endpoint abstraction.
+ */
+public class HadoopIgfsEndpoint {
+ /** Localhost. */
+ public static final String LOCALHOST = "127.0.0.1";
+
+ /** IGFS name. */
+ private final String igfsName;
+
+ /** Grid name. */
+ private final String gridName;
+
+ /** Host. */
+ private final String host;
+
+ /** Port. */
+ private final int port;
+
+ /**
+ * Normalize IGFS URI.
+ *
+ * @param uri URI.
+ * @return Normalized URI.
+ * @throws IOException If failed.
+ */
+ public static URI normalize(URI uri) throws IOException {
+ try {
+ if (!F.eq(IgniteFileSystem.IGFS_SCHEME, uri.getScheme()))
+ throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri);
+
+ HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(uri.getAuthority());
+
+ StringBuilder sb = new StringBuilder();
+
+ if (endpoint.igfs() != null)
+ sb.append(endpoint.igfs());
+
+ if (endpoint.grid() != null)
+ sb.append(":").append(endpoint.grid());
+
+ return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(),
+ uri.getPath(), uri.getQuery(), uri.getFragment());
+ }
+ catch (URISyntaxException | IgniteCheckedException e) {
+ throw new IOException("Failed to normalize URI: " + uri, e);
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param connStr Connection string.
+ * @throws IgniteCheckedException If failed to parse connection string.
+ */
+ public HadoopIgfsEndpoint(@Nullable String connStr) throws IgniteCheckedException {
+ if (connStr == null)
+ connStr = "";
+
+ String[] tokens = connStr.split("@", -1);
+
+ IgniteBiTuple<String, Integer> hostPort;
+
+ if (tokens.length == 1) {
+ igfsName = null;
+ gridName = null;
+
+ hostPort = hostPort(connStr, connStr);
+ }
+ else if (tokens.length == 2) {
+ String authStr = tokens[0];
+
+ if (authStr.isEmpty()) {
+ gridName = null;
+ igfsName = null;
+ }
+ else {
+ String[] authTokens = authStr.split(":", -1);
+
+ igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0];
+
+ if (authTokens.length == 1)
+ gridName = null;
+ else if (authTokens.length == 2)
+ gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1];
+ else
+ throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+ }
+
+ hostPort = hostPort(connStr, tokens[1]);
+ }
+ else
+ throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+
+ host = hostPort.get1();
+
+ assert hostPort.get2() != null;
+
+ port = hostPort.get2();
+ }
+
+ /**
+ * Parse host and port.
+ *
+ * @param connStr Full connection string.
+ * @param hostPortStr Host/port connection string part.
+ * @return Tuple with host and port.
+ * @throws IgniteCheckedException If failed to parse connection string.
+ */
+ private IgniteBiTuple<String, Integer> hostPort(String connStr, String hostPortStr) throws IgniteCheckedException {
+ String[] tokens = hostPortStr.split(":", -1);
+
+ String host = tokens[0];
+
+ if (F.isEmpty(host))
+ host = LOCALHOST;
+
+ int port;
+
+ if (tokens.length == 1)
+ port = DFLT_IPC_PORT;
+ else if (tokens.length == 2) {
+ String portStr = tokens[1];
+
+ try {
+ port = Integer.valueOf(portStr);
+
+ if (port < 0 || port > 65535)
+ throw new IgniteCheckedException("Invalid port number: " + connStr);
+ }
+ catch (NumberFormatException e) {
+ throw new IgniteCheckedException("Invalid port number: " + connStr);
+ }
+ }
+ else
+ throw new IgniteCheckedException("Invalid connection string format: " + connStr);
+
+ return F.t(host, port);
+ }
+
+ /**
+ * @return IGFS name.
+ */
+ @Nullable public String igfs() {
+ return igfsName;
+ }
+
+ /**
+ * @return Grid name.
+ */
+ @Nullable public String grid() {
+ return gridName;
+ }
+
+ /**
+ * @return Host.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return Host.
+ */
+ public boolean isLocal() {
+ return F.eq(LOCALHOST, host);
+ }
+
+ /**
+ * @return Port.
+ */
+ public int port() {
+ return port;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopIgfsEndpoint.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
new file mode 100644
index 0000000..2200e78
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Extended IGFS server interface.
+ */
+public interface HadoopIgfsEx extends HadoopIgfs {
+ /**
+ * Adds event listener that will be invoked when connection with server is lost or remote error has occurred.
+ * If connection is closed already, callback will be invoked synchronously inside this method.
+ *
+ * @param delegate Stream delegate.
+ * @param lsnr Event listener.
+ */
+ public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr);
+
+ /**
+ * Removes event listener that will be invoked when connection with server is lost or remote error has occurred.
+ *
+ * @param delegate Stream delegate.
+ */
+ public void removeEventListener(HadoopIgfsStreamDelegate delegate);
+
+ /**
+ * Asynchronously reads specified amount of bytes from opened input stream.
+ *
+ * @param delegate Stream delegate.
+ * @param pos Position to read from.
+ * @param len Data length to read.
+ * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining
+ * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will
+ * be the result of read future.
+ * @param outOff Output offset.
+ * @param outLen Output length.
+ * @return Read data.
+ */
+ public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
+ @Nullable final byte[] outBuf, final int outOff, final int outLen);
+
+ /**
+ * Writes data to the stream with given streamId. This method does not return any future since
+ * no response to write request is sent.
+ *
+ * @param delegate Stream delegate.
+ * @param data Data to write.
+ * @param off Offset.
+ * @param len Length.
+ * @throws IOException If failed.
+ */
+ public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException;
+
+ /**
+ * Close server stream.
+ *
+ * @param delegate Stream delegate.
+ * @throws IOException If failed.
+ */
+ public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+ /**
+ * Flush output stream.
+ *
+ * @param delegate Stream delegate.
+ * @throws IOException If failed.
+ */
+ public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
new file mode 100644
index 0000000..59a8f49
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.igfs;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * IGFS client future that holds response parse closure.
+ */
+public class HadoopIgfsFuture<T> extends GridPlainFutureAdapter<T> {
+ /** Output buffer. */
+ private byte[] outBuf;
+
+ /** Output offset. */
+ private int outOff;
+
+ /** Output length. */
+ private int outLen;
+
+ /** Read future flag. */
+ private boolean read;
+
+ /**
+ * @return Output buffer.
+ */
+ public byte[] outputBuffer() {
+ return outBuf;
+ }
+
+ /**
+ * @param outBuf Output buffer.
+ */
+ public void outputBuffer(@Nullable byte[] outBuf) {
+ this.outBuf = outBuf;
+ }
+
+ /**
+ * @return Offset in output buffer to write from.
+ */
+ public int outputOffset() {
+ return outOff;
+ }
+
+ /**
+ * @param outOff Offset in output buffer to write from.
+ */
+ public void outputOffset(int outOff) {
+ this.outOff = outOff;
+ }
+
+ /**
+ * @return Length to write to output buffer.
+ */
+ public int outputLength() {
+ return outLen;
+ }
+
+ /**
+ * @param outLen Length to write to output buffer.
+ */
+ public void outputLength(int outLen) {
+ this.outLen = outLen;
+ }
+
+ /**
+ * @param read {@code True} if this is a read future.
+ */
+ public void read(boolean read) {
+ this.read = read;
+ }
+
+ /**
+ * @return {@code True} if this is a read future.
+ */
+ public boolean read() {
+ return read;
+ }
+}