You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:30 UTC
[14/31] incubator-ignite git commit: # IGNITE-386: WIP on internal
namings (2).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
deleted file mode 100644
index e1bf9b6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
-
-/**
- * Utilities for configuring file systems to support the separate working directory per each thread.
- */
-public class GridHadoopFileSystemsUtils {
- /** Name of the property for setting working directory on create new local FS instance. */
- public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
-
- /**
- * Set user name and default working directory for current thread if it's supported by file system.
- *
- * @param fs File system.
- * @param userName User name.
- */
- public static void setUser(FileSystem fs, String userName) {
- if (fs instanceof IgniteHadoopFileSystem)
- ((IgniteHadoopFileSystem)fs).setUser(userName);
- else if (fs instanceof GridHadoopDistributedFileSystem)
- ((GridHadoopDistributedFileSystem)fs).setUser(userName);
- }
-
- /**
- * Setup wrappers of filesystems to support the separate working directory.
- *
- * @param cfg Config for setup.
- */
- public static void setupFileSystems(Configuration cfg) {
- cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", GridHadoopLocalFileSystemV1.class.getName());
- cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
- GridHadoopLocalFileSystemV2.class.getName());
-
- cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", GridHadoopDistributedFileSystem.class.getName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
deleted file mode 100644
index 28834d4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.fs.*;
-
-import java.io.*;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class GridHadoopLocalFileSystemV1 extends LocalFileSystem {
- /**
- * Creates new local file system.
- */
- public GridHadoopLocalFileSystemV1() {
- super(new GridHadoopRawLocalFileSystem());
- }
-
- /** {@inheritDoc} */
- @Override public File pathToFile(Path path) {
- return ((GridHadoopRawLocalFileSystem)getRaw()).convert(path);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
deleted file mode 100644
index 62d7cea..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.local.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.hadoop.fs.FsConstants.*;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class GridHadoopLocalFileSystemV2 extends ChecksumFs {
- /**
- * Creates new local file system.
- *
- * @param cfg Configuration.
- * @throws IOException If failed.
- * @throws URISyntaxException If failed.
- */
- public GridHadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
- super(new DelegateFS(cfg));
- }
-
- /**
- * Creates new local file system.
- *
- * @param uri URI.
- * @param cfg Configuration.
- * @throws IOException If failed.
- * @throws URISyntaxException If failed.
- */
- public GridHadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
- this(cfg);
- }
-
- /**
- * Delegate file system.
- */
- private static class DelegateFS extends DelegateToFileSystem {
- /**
- * Creates new local file system.
- *
- * @param cfg Configuration.
- * @throws IOException If failed.
- * @throws URISyntaxException If failed.
- */
- public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
- super(LOCAL_FS_URI, new GridHadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
- }
-
- /** {@inheritDoc} */
- @Override public int getUriDefaultPort() {
- return -1;
- }
-
- /** {@inheritDoc} */
- @Override public FsServerDefaults getServerDefaults() throws IOException {
- return LocalConfigKeys.getServerDefaults();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isValidName(String src) {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
deleted file mode 100644
index 29645f8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.file.*;
-
-/**
- * Local file system implementation for Hadoop.
- */
-public class GridHadoopRawLocalFileSystem extends FileSystem {
- /** Working directory for each thread. */
- private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
- @Override protected Path initialValue() {
- return getInitialWorkingDirectory();
- }
- };
-
- /**
- * Converts Hadoop path to local path.
- *
- * @param path Hadoop path.
- * @return Local path.
- */
- File convert(Path path) {
- checkPath(path);
-
- if (path.isAbsolute())
- return new File(path.toUri().getPath());
-
- return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
- }
-
- /** {@inheritDoc} */
- @Override public Path getHomeDirectory() {
- return makeQualified(new Path(System.getProperty("user.home")));
- }
-
- /** {@inheritDoc} */
- @Override public Path getInitialWorkingDirectory() {
- File f = new File(System.getProperty("user.dir"));
-
- return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
- }
-
- /** {@inheritDoc} */
- @Override public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
-
- setConf(conf);
-
- String initWorkDir = conf.get(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
-
- if (initWorkDir != null)
- setWorkingDirectory(new Path(initWorkDir));
- }
-
- /** {@inheritDoc} */
- @Override public URI getUri() {
- return FsConstants.LOCAL_FS_URI;
- }
-
- /** {@inheritDoc} */
- @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- return new FSDataInputStream(new InStream(checkExists(convert(f))));
- }
-
- /** {@inheritDoc} */
- @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
- short replication, long blockSize, Progressable progress) throws IOException {
- File file = convert(f);
-
- if (!overwrite && !file.createNewFile())
- throw new IOException("Failed to create new file: " + f.toUri());
-
- return out(file, false, bufSize);
- }
-
- /**
- * @param file File.
- * @param append Append flag.
- * @return Output stream.
- * @throws IOException If failed.
- */
- private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
- return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
- bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
- }
-
- /** {@inheritDoc} */
- @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
- return out(convert(f), true, bufSize);
- }
-
- /** {@inheritDoc} */
- @Override public boolean rename(Path src, Path dst) throws IOException {
- return convert(src).renameTo(convert(dst));
- }
-
- /** {@inheritDoc} */
- @Override public boolean delete(Path f, boolean recursive) throws IOException {
- File file = convert(f);
-
- if (file.isDirectory() && !recursive)
- throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
-
- return U.delete(file);
- }
-
- /** {@inheritDoc} */
- @Override public void setWorkingDirectory(Path dir) {
- workDir.set(fixRelativePart(dir));
-
- checkPath(dir);
- }
-
- /** {@inheritDoc} */
- @Override public Path getWorkingDirectory() {
- return workDir.get();
- }
-
- /** {@inheritDoc} */
- @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
- if(f == null)
- throw new IllegalArgumentException("mkdirs path arg is null");
-
- Path parent = f.getParent();
-
- File p2f = convert(f);
-
- if(parent != null) {
- File parent2f = convert(parent);
-
- if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
- throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
-
- }
-
- return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
- }
-
- /** {@inheritDoc} */
- @Override public FileStatus getFileStatus(Path f) throws IOException {
- return fileStatus(checkExists(convert(f)));
- }
-
- /**
- * @return File status.
- */
- private FileStatus fileStatus(File file) throws IOException {
- boolean dir = file.isDirectory();
-
- java.nio.file.Path path = dir ? null : file.toPath();
-
- return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
- /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
- new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
- }
-
- /**
- * @param file File.
- * @return Same file.
- * @throws FileNotFoundException If does not exist.
- */
- private static File checkExists(File file) throws FileNotFoundException {
- if (!file.exists())
- throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
-
- return file;
- }
-
- /** {@inheritDoc} */
- @Override public FileStatus[] listStatus(Path f) throws IOException {
- File file = convert(f);
-
- if (checkExists(file).isFile())
- return new FileStatus[] {fileStatus(file)};
-
- File[] files = file.listFiles();
-
- FileStatus[] res = new FileStatus[files.length];
-
- for (int i = 0; i < res.length; i++)
- res[i] = fileStatus(files[i]);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean supportsSymlinks() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
- Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
- }
-
- /** {@inheritDoc} */
- @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
- return getFileStatus(getLinkTarget(f));
- }
-
- /** {@inheritDoc} */
- @Override public Path getLinkTarget(Path f) throws IOException {
- File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
-
- return new Path(file.toURI());
- }
-
- /**
- * Input stream.
- */
- private static class InStream extends InputStream implements Seekable, PositionedReadable {
- /** */
- private final RandomAccessFile file;
-
- /**
- * @param f File.
- * @throws IOException If failed.
- */
- public InStream(File f) throws IOException {
- file = new RandomAccessFile(f, "r");
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read() throws IOException {
- return file.read();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
- return file.read(b, off, len);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void close() throws IOException {
- file.close();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
- long pos0 = file.getFilePointer();
-
- file.seek(pos);
- int res = file.read(buf, off, len);
-
- file.seek(pos0);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
- if (read(pos, buf, off, len) != len)
- throw new IOException();
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(long pos, byte[] buf) throws IOException {
- readFully(pos, buf, 0, buf.length);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void seek(long pos) throws IOException {
- file.seek(pos);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized long getPos() throws IOException {
- return file.getFilePointer();
- }
-
- /** {@inheritDoc} */
- @Override public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
new file mode 100644
index 0000000..88c5899
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.ignite.configuration.IgfsConfiguration.*;
+
+/**
+ * Wrapper of HDFS for support of separated working directory.
+ */
+public class HadoopDistributedFileSystem extends DistributedFileSystem {
+ /** User name for each thread. */
+ private final ThreadLocal<String> userName = new ThreadLocal<String>() {
+ /** {@inheritDoc} */
+ @Override protected String initialValue() {
+ return DFLT_USER_NAME;
+ }
+ };
+
+ /** Working directory for each thread. */
+ private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
+ /** {@inheritDoc} */
+ @Override protected Path initialValue() {
+ return getHomeDirectory();
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+
+ setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+ }
+
+ /**
+ * Set user name and default working directory for current thread.
+ *
+ * @param userName User name.
+ */
+ public void setUser(String userName) {
+ this.userName.set(userName);
+
+ setWorkingDirectory(getHomeDirectory());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getHomeDirectory() {
+ Path path = new Path("/user/" + userName.get());
+
+ return path.makeQualified(getUri(), null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWorkingDirectory(Path dir) {
+ Path fixedDir = fixRelativePart(dir);
+
+ String res = fixedDir.toUri().getPath();
+
+ if (!DFSUtil.isValidName(res))
+ throw new IllegalArgumentException("Invalid DFS directory name " + res);
+
+ workingDir.set(fixedDir);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getWorkingDirectory() {
+ return workingDir.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
new file mode 100644
index 0000000..f3f51d4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+
+/**
+ * Utilities for configuring file systems to support the separate working directory per each thread.
+ */
+public class HadoopFileSystemsUtils {
+ /** Name of the property for setting working directory on create new local FS instance. */
+ public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
+
+ /**
+ * Set user name and default working directory for current thread if it's supported by file system.
+ *
+ * @param fs File system.
+ * @param userName User name.
+ */
+ public static void setUser(FileSystem fs, String userName) {
+ if (fs instanceof IgniteHadoopFileSystem)
+ ((IgniteHadoopFileSystem)fs).setUser(userName);
+ else if (fs instanceof HadoopDistributedFileSystem)
+ ((HadoopDistributedFileSystem)fs).setUser(userName);
+ }
+
+ /**
+ * Setup wrappers of filesystems to support the separate working directory.
+ *
+ * @param cfg Config for setup.
+ */
+ public static void setupFileSystems(Configuration cfg) {
+ cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
+ cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
+ HadoopLocalFileSystemV2.class.getName());
+
+ cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
new file mode 100644
index 0000000..9cc5881
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.*;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV1 extends LocalFileSystem {
+ /**
+ * Creates new local file system.
+ */
+ public HadoopLocalFileSystemV1() {
+ super(new HadoopRawLocalFileSystem());
+ }
+
+ /** {@inheritDoc} */
+ @Override public File pathToFile(Path path) {
+ return ((HadoopRawLocalFileSystem)getRaw()).convert(path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
new file mode 100644
index 0000000..15ddc5a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.local.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.hadoop.fs.FsConstants.*;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV2 extends ChecksumFs {
+ /**
+ * Creates new local file system.
+ *
+ * @param cfg Configuration.
+ * @throws IOException If failed.
+ * @throws URISyntaxException If failed.
+ */
+ public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
+ super(new DelegateFS(cfg));
+ }
+
+ /**
+ * Creates new local file system.
+ *
+ * @param uri URI.
+ * @param cfg Configuration.
+ * @throws IOException If failed.
+ * @throws URISyntaxException If failed.
+ */
+ public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
+ this(cfg);
+ }
+
+ /**
+ * Delegate file system.
+ */
+ private static class DelegateFS extends DelegateToFileSystem {
+ /**
+ * Creates new local file system.
+ *
+ * @param cfg Configuration.
+ * @throws IOException If failed.
+ * @throws URISyntaxException If failed.
+ */
+ public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
+ super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getUriDefaultPort() {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FsServerDefaults getServerDefaults() throws IOException {
+ return LocalConfigKeys.getServerDefaults();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isValidName(String src) {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
new file mode 100644
index 0000000..e5ec3f7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+
+/**
+ * Local file system implementation for Hadoop.
+ */
+public class HadoopRawLocalFileSystem extends FileSystem {
+ /** Working directory for each thread. */
+ private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
+ @Override protected Path initialValue() {
+ return getInitialWorkingDirectory();
+ }
+ };
+
+ /**
+ * Converts Hadoop path to local path.
+ *
+ * @param path Hadoop path.
+ * @return Local path.
+ */
+ File convert(Path path) {
+ checkPath(path);
+
+ if (path.isAbsolute())
+ return new File(path.toUri().getPath());
+
+ return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getHomeDirectory() {
+ return makeQualified(new Path(System.getProperty("user.home")));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getInitialWorkingDirectory() {
+ File f = new File(System.getProperty("user.dir"));
+
+ return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+
+ setConf(conf);
+
+ String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
+
+ if (initWorkDir != null)
+ setWorkingDirectory(new Path(initWorkDir));
+ }
+
+ /** {@inheritDoc} */
+ @Override public URI getUri() {
+ return FsConstants.LOCAL_FS_URI;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return new FSDataInputStream(new InStream(checkExists(convert(f))));
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ File file = convert(f);
+
+ if (!overwrite && !file.createNewFile())
+ throw new IOException("Failed to create new file: " + f.toUri());
+
+ return out(file, false, bufSize);
+ }
+
+ /**
+ * @param file File.
+ * @param append Append flag.
+ * @return Output stream.
+ * @throws IOException If failed.
+ */
+ private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
+ return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
+ bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
+ return out(convert(f), true, bufSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rename(Path src, Path dst) throws IOException {
+ return convert(src).renameTo(convert(dst));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(Path f, boolean recursive) throws IOException {
+ File file = convert(f);
+
+ if (file.isDirectory() && !recursive)
+ throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
+
+ return U.delete(file);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWorkingDirectory(Path dir) {
+ workDir.set(fixRelativePart(dir));
+
+ checkPath(dir);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getWorkingDirectory() {
+ return workDir.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ if(f == null)
+ throw new IllegalArgumentException("mkdirs path arg is null");
+
+ Path parent = f.getParent();
+
+ File p2f = convert(f);
+
+ if(parent != null) {
+ File parent2f = convert(parent);
+
+ if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
+ throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
+
+ }
+
+ return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileStatus(Path f) throws IOException {
+ return fileStatus(checkExists(convert(f)));
+ }
+
+ /**
+ * @return File status.
+ */
+ private FileStatus fileStatus(File file) throws IOException {
+ boolean dir = file.isDirectory();
+
+ java.nio.file.Path path = dir ? null : file.toPath();
+
+ return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
+ /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
+ new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
+ }
+
+ /**
+ * @param file File.
+ * @return Same file.
+ * @throws FileNotFoundException If does not exist.
+ */
+ private static File checkExists(File file) throws FileNotFoundException {
+ if (!file.exists())
+ throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
+
+ return file;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus[] listStatus(Path f) throws IOException {
+ File file = convert(f);
+
+ if (checkExists(file).isFile())
+ return new FileStatus[] {fileStatus(file)};
+
+ File[] files = file.listFiles();
+
+ FileStatus[] res = new FileStatus[files.length];
+
+ for (int i = 0; i < res.length; i++)
+ res[i] = fileStatus(files[i]);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean supportsSymlinks() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
+ Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
+ return getFileStatus(getLinkTarget(f));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getLinkTarget(Path f) throws IOException {
+ File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
+
+ return new Path(file.toURI());
+ }
+
+ /**
+ * Input stream.
+ */
+ private static class InStream extends InputStream implements Seekable, PositionedReadable {
+ /** */
+ private final RandomAccessFile file;
+
+ /**
+ * @param f File.
+ * @throws IOException If failed.
+ */
+ public InStream(File f) throws IOException {
+ file = new RandomAccessFile(f, "r");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read() throws IOException {
+ return file.read();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
+ return file.read(b, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() throws IOException {
+ file.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+ long pos0 = file.getFilePointer();
+
+ file.seek(pos);
+ int res = file.read(buf, off, len);
+
+ file.seek(pos0);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+ if (read(pos, buf, off, len) != len)
+ throw new IOException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(long pos, byte[] buf) throws IOException {
+ readFully(pos, buf, 0, buf.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void seek(long pos) throws IOException {
+ file.seek(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized long getPos() throws IOException {
+ return file.getFilePointer();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
deleted file mode 100644
index b124312..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.jobtracker;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
-
-/**
- * Hadoop job metadata. Internal object used for distributed job state tracking.
- */
-public class GridHadoopJobMetadata implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- private GridHadoopJobId jobId;
-
- /** Job info. */
- private GridHadoopJobInfo jobInfo;
-
- /** Node submitted job. */
- private UUID submitNodeId;
-
- /** Map-reduce plan. */
- private GridHadoopMapReducePlan mrPlan;
-
- /** Pending splits for which mapper should be executed. */
- private Map<GridHadoopInputSplit, Integer> pendingSplits;
-
- /** Pending reducers. */
- private Collection<Integer> pendingReducers;
-
- /** Reducers addresses. */
- @GridToStringInclude
- private Map<Integer, GridHadoopProcessDescriptor> reducersAddrs;
-
- /** Job phase. */
- private GridHadoopJobPhase phase = PHASE_SETUP;
-
- /** Fail cause. */
- @GridToStringExclude
- private Throwable failCause;
-
- /** Version. */
- private long ver;
-
- /** Job counters */
- private GridHadoopCounters counters = new GridHadoopCountersImpl();
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridHadoopJobMetadata() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param submitNodeId Submit node ID.
- * @param jobId Job ID.
- * @param jobInfo Job info.
- */
- public GridHadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
- this.jobId = jobId;
- this.jobInfo = jobInfo;
- this.submitNodeId = submitNodeId;
- }
-
- /**
- * Copy constructor.
- *
- * @param src Metadata to copy.
- */
- public GridHadoopJobMetadata(GridHadoopJobMetadata src) {
- // Make sure to preserve alphabetic order.
- counters = src.counters;
- failCause = src.failCause;
- jobId = src.jobId;
- jobInfo = src.jobInfo;
- mrPlan = src.mrPlan;
- pendingSplits = src.pendingSplits;
- pendingReducers = src.pendingReducers;
- phase = src.phase;
- reducersAddrs = src.reducersAddrs;
- submitNodeId = src.submitNodeId;
- ver = src.ver + 1;
- }
-
- /**
- * @return Submit node ID.
- */
- public UUID submitNodeId() {
- return submitNodeId;
- }
-
- /**
- * @param phase Job phase.
- */
- public void phase(GridHadoopJobPhase phase) {
- this.phase = phase;
- }
-
- /**
- * @return Job phase.
- */
- public GridHadoopJobPhase phase() {
- return phase;
- }
-
- /**
- * Gets reducers addresses for external execution.
- *
- * @return Reducers addresses.
- */
- public Map<Integer, GridHadoopProcessDescriptor> reducersAddresses() {
- return reducersAddrs;
- }
-
- /**
- * Sets reducers addresses for external execution.
- *
- * @param reducersAddrs Map of addresses.
- */
- public void reducersAddresses(Map<Integer, GridHadoopProcessDescriptor> reducersAddrs) {
- this.reducersAddrs = reducersAddrs;
- }
-
- /**
- * Sets collection of pending splits.
- *
- * @param pendingSplits Collection of pending splits.
- */
- public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) {
- this.pendingSplits = pendingSplits;
- }
-
- /**
- * Gets collection of pending splits.
- *
- * @return Collection of pending splits.
- */
- public Map<GridHadoopInputSplit, Integer> pendingSplits() {
- return pendingSplits;
- }
-
- /**
- * Sets collection of pending reducers.
- *
- * @param pendingReducers Collection of pending reducers.
- */
- public void pendingReducers(Collection<Integer> pendingReducers) {
- this.pendingReducers = pendingReducers;
- }
-
- /**
- * Gets collection of pending reducers.
- *
- * @return Collection of pending reducers.
- */
- public Collection<Integer> pendingReducers() {
- return pendingReducers;
- }
-
- /**
- * @return Job ID.
- */
- public GridHadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @param mrPlan Map-reduce plan.
- */
- public void mapReducePlan(GridHadoopMapReducePlan mrPlan) {
- assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
-
- this.mrPlan = mrPlan;
- }
-
- /**
- * @return Map-reduce plan.
- */
- public GridHadoopMapReducePlan mapReducePlan() {
- return mrPlan;
- }
-
- /**
- * @return Job info.
- */
- public GridHadoopJobInfo jobInfo() {
- return jobInfo;
- }
-
- /**
- * Returns job counters.
- *
- * @return Collection of counters.
- */
- public GridHadoopCounters counters() {
- return counters;
- }
-
- /**
- * Sets counters.
- *
- * @param counters Collection of counters.
- */
- public void counters(GridHadoopCounters counters) {
- this.counters = counters;
- }
-
- /**
- * @param failCause Fail cause.
- */
- public void failCause(Throwable failCause) {
- assert failCause != null;
-
- if (this.failCause == null) // Keep the first error.
- this.failCause = failCause;
- }
-
- /**
- * @return Fail cause.
- */
- public Throwable failCause() {
- return failCause;
- }
-
- /**
- * @return Version.
- */
- public long version() {
- return ver;
- }
-
- /**
- * @param split Split.
- * @return Task number.
- */
- public int taskNumber(GridHadoopInputSplit split) {
- return pendingSplits.get(split);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeUuid(out, submitNodeId);
- out.writeObject(jobId);
- out.writeObject(jobInfo);
- out.writeObject(mrPlan);
- out.writeObject(pendingSplits);
- out.writeObject(pendingReducers);
- out.writeObject(phase);
- out.writeObject(failCause);
- out.writeLong(ver);
- out.writeObject(reducersAddrs);
- out.writeObject(counters);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- submitNodeId = U.readUuid(in);
- jobId = (GridHadoopJobId)in.readObject();
- jobInfo = (GridHadoopJobInfo)in.readObject();
- mrPlan = (GridHadoopMapReducePlan)in.readObject();
- pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject();
- pendingReducers = (Collection<Integer>)in.readObject();
- phase = (GridHadoopJobPhase)in.readObject();
- failCause = (Throwable)in.readObject();
- ver = in.readLong();
- reducersAddrs = (Map<Integer, GridHadoopProcessDescriptor>)in.readObject();
- counters = (GridHadoopCounters)in.readObject();
- }
-
- /** {@inheritDoc} */
- public String toString() {
- return S.toString(GridHadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
- "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
- failCause.getClass().getName());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
new file mode 100644
index 0000000..6042775
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.jobtracker;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
+
+/**
+ * Hadoop job metadata. Internal object used for distributed job state tracking.
+ */
+public class HadoopJobMetadata implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ private GridHadoopJobId jobId;
+
+ /** Job info. */
+ private GridHadoopJobInfo jobInfo;
+
+ /** Node submitted job. */
+ private UUID submitNodeId;
+
+ /** Map-reduce plan. */
+ private GridHadoopMapReducePlan mrPlan;
+
+ /** Pending splits for which mapper should be executed. */
+ private Map<GridHadoopInputSplit, Integer> pendingSplits;
+
+ /** Pending reducers. */
+ private Collection<Integer> pendingReducers;
+
+ /** Reducers addresses. */
+ @GridToStringInclude
+ private Map<Integer, HadoopProcessDescriptor> reducersAddrs;
+
+ /** Job phase. */
+ private GridHadoopJobPhase phase = PHASE_SETUP;
+
+ /** Fail cause. */
+ @GridToStringExclude
+ private Throwable failCause;
+
+ /** Version. */
+ private long ver;
+
+ /** Job counters */
+ private GridHadoopCounters counters = new HadoopCountersImpl();
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public HadoopJobMetadata() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param submitNodeId Submit node ID.
+ * @param jobId Job ID.
+ * @param jobInfo Job info.
+ */
+ public HadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
+ this.jobId = jobId;
+ this.jobInfo = jobInfo;
+ this.submitNodeId = submitNodeId;
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param src Metadata to copy.
+ */
+ public HadoopJobMetadata(HadoopJobMetadata src) {
+ // Make sure to preserve alphabetic order.
+ counters = src.counters;
+ failCause = src.failCause;
+ jobId = src.jobId;
+ jobInfo = src.jobInfo;
+ mrPlan = src.mrPlan;
+ pendingSplits = src.pendingSplits;
+ pendingReducers = src.pendingReducers;
+ phase = src.phase;
+ reducersAddrs = src.reducersAddrs;
+ submitNodeId = src.submitNodeId;
+ ver = src.ver + 1;
+ }
+
+ /**
+ * @return Submit node ID.
+ */
+ public UUID submitNodeId() {
+ return submitNodeId;
+ }
+
+ /**
+ * @param phase Job phase.
+ */
+ public void phase(GridHadoopJobPhase phase) {
+ this.phase = phase;
+ }
+
+ /**
+ * @return Job phase.
+ */
+ public GridHadoopJobPhase phase() {
+ return phase;
+ }
+
+ /**
+ * Gets reducers addresses for external execution.
+ *
+ * @return Reducers addresses.
+ */
+ public Map<Integer, HadoopProcessDescriptor> reducersAddresses() {
+ return reducersAddrs;
+ }
+
+ /**
+ * Sets reducers addresses for external execution.
+ *
+ * @param reducersAddrs Map of addresses.
+ */
+ public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) {
+ this.reducersAddrs = reducersAddrs;
+ }
+
+ /**
+ * Sets collection of pending splits.
+ *
+ * @param pendingSplits Collection of pending splits.
+ */
+ public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) {
+ this.pendingSplits = pendingSplits;
+ }
+
+ /**
+ * Gets collection of pending splits.
+ *
+ * @return Collection of pending splits.
+ */
+ public Map<GridHadoopInputSplit, Integer> pendingSplits() {
+ return pendingSplits;
+ }
+
+ /**
+ * Sets collection of pending reducers.
+ *
+ * @param pendingReducers Collection of pending reducers.
+ */
+ public void pendingReducers(Collection<Integer> pendingReducers) {
+ this.pendingReducers = pendingReducers;
+ }
+
+ /**
+ * Gets collection of pending reducers.
+ *
+ * @return Collection of pending reducers.
+ */
+ public Collection<Integer> pendingReducers() {
+ return pendingReducers;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public GridHadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @param mrPlan Map-reduce plan.
+ */
+ public void mapReducePlan(GridHadoopMapReducePlan mrPlan) {
+ assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
+
+ this.mrPlan = mrPlan;
+ }
+
+ /**
+ * @return Map-reduce plan.
+ */
+ public GridHadoopMapReducePlan mapReducePlan() {
+ return mrPlan;
+ }
+
+ /**
+ * @return Job info.
+ */
+ public GridHadoopJobInfo jobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * Returns job counters.
+ *
+ * @return Collection of counters.
+ */
+ public GridHadoopCounters counters() {
+ return counters;
+ }
+
+ /**
+ * Sets counters.
+ *
+ * @param counters Collection of counters.
+ */
+ public void counters(GridHadoopCounters counters) {
+ this.counters = counters;
+ }
+
+ /**
+ * @param failCause Fail cause.
+ */
+ public void failCause(Throwable failCause) {
+ assert failCause != null;
+
+ if (this.failCause == null) // Keep the first error.
+ this.failCause = failCause;
+ }
+
+ /**
+ * @return Fail cause.
+ */
+ public Throwable failCause() {
+ return failCause;
+ }
+
+ /**
+ * @return Version.
+ */
+ public long version() {
+ return ver;
+ }
+
+ /**
+ * @param split Split.
+ * @return Task number.
+ */
+ public int taskNumber(GridHadoopInputSplit split) {
+ return pendingSplits.get(split);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeUuid(out, submitNodeId);
+ out.writeObject(jobId);
+ out.writeObject(jobInfo);
+ out.writeObject(mrPlan);
+ out.writeObject(pendingSplits);
+ out.writeObject(pendingReducers);
+ out.writeObject(phase);
+ out.writeObject(failCause);
+ out.writeLong(ver);
+ out.writeObject(reducersAddrs);
+ out.writeObject(counters);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ submitNodeId = U.readUuid(in);
+ jobId = (GridHadoopJobId)in.readObject();
+ jobInfo = (GridHadoopJobInfo)in.readObject();
+ mrPlan = (GridHadoopMapReducePlan)in.readObject();
+ pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject();
+ pendingReducers = (Collection<Integer>)in.readObject();
+ phase = (GridHadoopJobPhase)in.readObject();
+ failCause = (Throwable)in.readObject();
+ ver = in.readLong();
+ reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject();
+ counters = (GridHadoopCounters)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
+ "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
+ failCause.getClass().getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 91a2d6f..a0ae3f6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
+import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
/**
* Hadoop job tracker.
@@ -56,10 +56,10 @@ public class HadoopJobTracker extends HadoopComponent {
private final GridMutex mux = new GridMutex();
/** */
- private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj;
+ private volatile GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> jobMetaPrj;
/** Projection with expiry policy for finished job updates. */
- private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj;
+ private volatile GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> finishedJobMetaPrj;
/** Map-reduce execution planner. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -106,8 +106,8 @@ public class HadoopJobTracker extends HadoopComponent {
* @return Job meta projection.
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
- private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() {
- GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj;
+ private GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> jobMetaCache() {
+ GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> prj = jobMetaPrj;
if (prj == null) {
synchronized (mux) {
@@ -128,8 +128,8 @@ public class HadoopJobTracker extends HadoopComponent {
throw new IllegalStateException(e);
}
- jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)
- sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class);
+ jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata>)
+ sysCache.projection(GridHadoopJobId.class, HadoopJobMetadata.class);
if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
@@ -149,8 +149,8 @@ public class HadoopJobTracker extends HadoopComponent {
/**
* @return Projection with expiry policy for finished job updates.
*/
- private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() {
- GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj;
+ private GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> finishedJobMetaCache() {
+ GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj;
if (prj == null) {
jobMetaCache();
@@ -169,9 +169,9 @@ public class HadoopJobTracker extends HadoopComponent {
super.onKernalStart();
jobMetaCache().context().continuousQueries().executeInternalQuery(
- new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>() {
+ new CacheEntryUpdatedListener<GridHadoopJobId, HadoopJobMetadata>() {
@Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId,
- ? extends GridHadoopJobMetadata>> evts) {
+ ? extends HadoopJobMetadata>> evts) {
if (!busyLock.tryReadLock())
return;
@@ -250,7 +250,7 @@ public class HadoopJobTracker extends HadoopComponent {
GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
- GridHadoopJobMetadata meta = new GridHadoopJobMetadata(ctx.localNodeId(), jobId, info);
+ HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info);
meta.mapReducePlan(mrPlan);
@@ -268,7 +268,7 @@ public class HadoopJobTracker extends HadoopComponent {
long jobStart = U.currentTimeMillis();
- GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(meta.counters(),
+ HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(),
ctx.localNodeId());
perfCntr.clientSubmissionEvents(info);
@@ -297,7 +297,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @return Status.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public static GridHadoopJobStatus status(GridHadoopJobMetadata meta) {
+ public static GridHadoopJobStatus status(HadoopJobMetadata meta) {
GridHadoopJobInfo jobInfo = meta.jobInfo();
return new GridHadoopJobStatus(
@@ -325,7 +325,7 @@ public class HadoopJobTracker extends HadoopComponent {
return null; // Grid is stopping.
try {
- GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+ HadoopJobMetadata meta = jobMetaCache().get(jobId);
return meta != null ? status(meta) : null;
}
@@ -346,7 +346,7 @@ public class HadoopJobTracker extends HadoopComponent {
return null; // Grid is stopping.
try {
- GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+ HadoopJobMetadata meta = jobMetaCache().get(jobId);
if (meta == null)
return null;
@@ -400,7 +400,7 @@ public class HadoopJobTracker extends HadoopComponent {
return null;
try {
- GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+ HadoopJobMetadata meta = jobMetaCache().get(jobId);
if (meta != null)
return meta.mapReducePlan();
@@ -419,7 +419,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param status Task status.
*/
@SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
- public void onTaskFinished(GridHadoopTaskInfo info, GridHadoopTaskStatus status) {
+ public void onTaskFinished(GridHadoopTaskInfo info, HadoopTaskStatus status) {
if (!busyLock.tryReadLock())
return;
@@ -470,7 +470,7 @@ public class HadoopJobTracker extends HadoopComponent {
case COMMIT:
case ABORT: {
- GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache();
+ GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache();
cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
listenAsync(failsLog);
@@ -488,7 +488,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param jobId Job id.
* @param c Closure of operation.
*/
- private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void> c) {
+ private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, HadoopJobMetadata, Void> c) {
jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog);
}
@@ -500,7 +500,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param desc Process descriptor.
*/
public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers,
- GridHadoopProcessDescriptor desc) {
+ HadoopProcessDescriptor desc) {
transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
}
@@ -557,7 +557,7 @@ public class HadoopJobTracker extends HadoopComponent {
// Iteration over all local entries is correct since system cache is REPLICATED.
for (Object metaObj : jobMetaCache().values()) {
- GridHadoopJobMetadata meta = (GridHadoopJobMetadata)metaObj;
+ HadoopJobMetadata meta = (HadoopJobMetadata)metaObj;
GridHadoopJobId jobId = meta.jobId();
@@ -626,13 +626,13 @@ public class HadoopJobTracker extends HadoopComponent {
* @throws IgniteCheckedException If failed.
*/
private void processJobMetadataUpdates(
- Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata>> updated)
+ Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends HadoopJobMetadata>> updated)
throws IgniteCheckedException {
UUID locNodeId = ctx.localNodeId();
- for (CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata> entry : updated) {
+ for (CacheEntryEvent<? extends GridHadoopJobId, ? extends HadoopJobMetadata> entry : updated) {
GridHadoopJobId jobId = entry.getKey();
- GridHadoopJobMetadata meta = entry.getValue();
+ HadoopJobMetadata meta = entry.getValue();
if (meta == null || !ctx.isParticipating(meta))
continue;
@@ -689,7 +689,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param locNodeId Local node ID.
* @throws IgniteCheckedException If failed.
*/
- private void processJobMetaUpdate(GridHadoopJobId jobId, GridHadoopJobMetadata meta, UUID locNodeId)
+ private void processJobMetaUpdate(GridHadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId)
throws IgniteCheckedException {
JobLocalState state = activeJobs.get(jobId);
@@ -879,7 +879,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param meta Job metadata.
* @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node.
*/
- private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, GridHadoopJobMetadata meta) {
+ private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, HadoopJobMetadata meta) {
UUID locNodeId = ctx.localNodeId();
GridHadoopJobId jobId = meta.jobId();
@@ -978,7 +978,7 @@ public class HadoopJobTracker extends HadoopComponent {
try {
if (jobInfo == null) {
- GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+ HadoopJobMetadata meta = jobMetaCache().get(jobId);
if (meta == null)
throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId);
@@ -1024,7 +1024,7 @@ public class HadoopJobTracker extends HadoopComponent {
return false; // Grid is stopping.
try {
- GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+ HadoopJobMetadata meta = jobMetaCache().get(jobId);
if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled.");
@@ -1063,7 +1063,7 @@ public class HadoopJobTracker extends HadoopComponent {
return null;
try {
- final GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+ final HadoopJobMetadata meta = jobMetaCache().get(jobId);
return meta != null ? meta.counters() : null;
}
@@ -1158,7 +1158,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param status Task status.
* @param prev Previous closure.
*/
- private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
+ private void onSetupFinished(final GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
final GridHadoopJobId jobId = taskInfo.jobId();
if (status.state() == FAILED || status.state() == CRASHED)
@@ -1172,7 +1172,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param status Task status.
* @param prev Previous closure.
*/
- private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
+ private void onMapFinished(final GridHadoopTaskInfo taskInfo, HadoopTaskStatus status,
final StackedProcessor prev) {
final GridHadoopJobId jobId = taskInfo.jobId();
@@ -1213,7 +1213,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param status Task status.
* @param prev Previous closure.
*/
- private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
+ private void onReduceFinished(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
GridHadoopJobId jobId = taskInfo.jobId();
if (status.state() == FAILED || status.state() == CRASHED)
// Fail the whole job.
@@ -1227,7 +1227,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param status Task status.
* @param prev Previous closure.
*/
- private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
+ private void onCombineFinished(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status,
final StackedProcessor prev) {
final GridHadoopJobId jobId = taskInfo.jobId();
@@ -1302,7 +1302,7 @@ public class HadoopJobTracker extends HadoopComponent {
}
/** {@inheritDoc} */
- @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
cp.phase(phase);
}
}
@@ -1343,7 +1343,7 @@ public class HadoopJobTracker extends HadoopComponent {
}
/** {@inheritDoc} */
- @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
for (GridHadoopInputSplit s : splits)
@@ -1400,7 +1400,7 @@ public class HadoopJobTracker extends HadoopComponent {
}
/** {@inheritDoc} */
- @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
rdcCp.remove(rdc);
@@ -1425,7 +1425,7 @@ public class HadoopJobTracker extends HadoopComponent {
private final Collection<Integer> rdc;
/** Process descriptor for reducers. */
- private final GridHadoopProcessDescriptor desc;
+ private final HadoopProcessDescriptor desc;
/**
* @param prev Previous closure.
@@ -1434,7 +1434,7 @@ public class HadoopJobTracker extends HadoopComponent {
*/
private InitializeReducersProcessor(@Nullable StackedProcessor prev,
Collection<Integer> rdc,
- GridHadoopProcessDescriptor desc) {
+ HadoopProcessDescriptor desc) {
super(prev);
assert !F.isEmpty(rdc);
@@ -1445,11 +1445,11 @@ public class HadoopJobTracker extends HadoopComponent {
}
/** {@inheritDoc} */
- @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
- Map<Integer, GridHadoopProcessDescriptor> oldMap = meta.reducersAddresses();
+ @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+ Map<Integer, HadoopProcessDescriptor> oldMap = meta.reducersAddresses();
- Map<Integer, GridHadoopProcessDescriptor> rdcMap = oldMap == null ?
- new HashMap<Integer, GridHadoopProcessDescriptor>() : new HashMap<>(oldMap);
+ Map<Integer, HadoopProcessDescriptor> rdcMap = oldMap == null ?
+ new HashMap<Integer, HadoopProcessDescriptor>() : new HashMap<>(oldMap);
for (Integer r : rdc)
rdcMap.put(r, desc);
@@ -1511,7 +1511,7 @@ public class HadoopJobTracker extends HadoopComponent {
}
/** {@inheritDoc} */
- @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
assert meta.phase() == PHASE_CANCELLING || err != null: "Invalid phase for cancel: " + meta;
Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
@@ -1560,8 +1560,8 @@ public class HadoopJobTracker extends HadoopComponent {
}
/** {@inheritDoc} */
- @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
- GridHadoopCounters cntrs = new GridHadoopCountersImpl(cp.counters());
+ @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+ GridHadoopCounters cntrs = new HadoopCountersImpl(cp.counters());
cntrs.merge(counters);
@@ -1573,7 +1573,7 @@ public class HadoopJobTracker extends HadoopComponent {
* Abstract stacked closure.
*/
private abstract static class StackedProcessor implements
- EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void>, Serializable {
+ EntryProcessor<GridHadoopJobId, HadoopJobMetadata, Void>, Serializable {
/** */
private static final long serialVersionUID = 0L;
@@ -1588,8 +1588,8 @@ public class HadoopJobTracker extends HadoopComponent {
}
/** {@inheritDoc} */
- @Override public Void process(MutableEntry<GridHadoopJobId, GridHadoopJobMetadata> e, Object... args) {
- GridHadoopJobMetadata val = apply(e.getValue());
+ @Override public Void process(MutableEntry<GridHadoopJobId, HadoopJobMetadata> e, Object... args) {
+ HadoopJobMetadata val = apply(e.getValue());
if (val != null)
e.setValue(val);
@@ -1603,11 +1603,11 @@ public class HadoopJobTracker extends HadoopComponent {
* @param meta Old value.
* @return New value.
*/
- private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) {
+ private HadoopJobMetadata apply(HadoopJobMetadata meta) {
if (meta == null)
return null;
- GridHadoopJobMetadata cp = prev != null ? prev.apply(meta) : new GridHadoopJobMetadata(meta);
+ HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta);
update(meta, cp);
@@ -1620,6 +1620,6 @@ public class HadoopJobTracker extends HadoopComponent {
* @param meta Initial job metadata.
* @param cp Copy.
*/
- protected abstract void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp);
+ protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
deleted file mode 100644
index 1670a8a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.message;
-
-import java.io.*;
-
-/**
- * Marker interface for all hadoop messages.
- */
-public interface GridHadoopMessage extends Externalizable {
- // No-op.
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
new file mode 100644
index 0000000..cab6138
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.message;
+
+import java.io.*;
+
+/**
+ * Marker interface for all hadoop messages.
+ */
+public interface HadoopMessage extends Externalizable {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
deleted file mode 100644
index 7988403..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Map-reduce plan.
- */
-public class GridHadoopDefaultMapReducePlan implements GridHadoopMapReducePlan {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Mappers map. */
- private Map<UUID, Collection<GridHadoopInputSplit>> mappers;
-
- /** Reducers map. */
- private Map<UUID, int[]> reducers;
-
- /** Mappers count. */
- private int mappersCnt;
-
- /** Reducers count. */
- private int reducersCnt;
-
- /**
- * @param mappers Mappers map.
- * @param reducers Reducers map.
- */
- public GridHadoopDefaultMapReducePlan(Map<UUID, Collection<GridHadoopInputSplit>> mappers,
- Map<UUID, int[]> reducers) {
- this.mappers = mappers;
- this.reducers = reducers;
-
- if (mappers != null) {
- for (Collection<GridHadoopInputSplit> splits : mappers.values())
- mappersCnt += splits.size();
- }
-
- if (reducers != null) {
- for (int[] rdcrs : reducers.values())
- reducersCnt += rdcrs.length;
- }
- }
-
- /** {@inheritDoc} */
- @Override public int mappers() {
- return mappersCnt;
- }
-
- /** {@inheritDoc} */
- @Override public int reducers() {
- return reducersCnt;
- }
-
- /** {@inheritDoc} */
- @Override public UUID nodeForReducer(int reducer) {
- assert reducer >= 0 && reducer < reducersCnt : reducer;
-
- for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) {
- for (int r : entry.getValue()) {
- if (r == reducer)
- return entry.getKey();
- }
- }
-
- throw new IllegalStateException("Not found reducer index: " + reducer);
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId) {
- return mappers.get(nodeId);
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public int[] reducers(UUID nodeId) {
- return reducers.get(nodeId);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<UUID> mapperNodeIds() {
- return mappers.keySet();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<UUID> reducerNodeIds() {
- return reducers.keySet();
- }
-}