You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/04 16:35:35 UTC
[33/45] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java
deleted file mode 100644
index 66b1db4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.file.*;
-import java.text.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.IgniteVersionUtils.*;
-
-/**
- * Setup tool to configure Hadoop client.
- */
-public class GridHadoopSetup {
- /** */
- public static final String WINUTILS_EXE = "winutils.exe";
-
- /** */
- private static final FilenameFilter IGNITE_JARS = new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- return name.startsWith("ignite-") && name.endsWith(".jar");
- }
- };
-
- /**
- * The main method.
- * @param ignore Params.
- */
- public static void main(String[] ignore) {
- X.println(
- " __________ ________________ ",
- " / _/ ___/ |/ / _/_ __/ __/ ",
- " _/ // (_ / // / / / / _/ ",
- "/___/\\___/_/|_/___/ /_/ /___/ ",
- " for Apache Hadoop ",
- " ");
-
- println("Version " + ACK_VER_STR);
-
- configureHadoop();
- }
-
- /**
- * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop.
- * It performs these operations:
- * <ul>
- * <li>Check for setting of HADOOP_HOME environment variable.</li>
- * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li>
- * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li>
- * <li>In Windows check new line character issues in CMD scripts.</li>
- * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li>
- * </ul>
- */
- private static void configureHadoop() {
- String igniteHome = U.getIgniteHome();
-
- println("IGNITE_HOME is set to '" + igniteHome + "'.");
-
- checkIgniteHome(igniteHome);
-
- String homeVar = "HADOOP_HOME";
- String hadoopHome = System.getenv(homeVar);
-
- if (F.isEmpty(hadoopHome)) {
- homeVar = "HADOOP_PREFIX";
- hadoopHome = System.getenv(homeVar);
- }
-
- if (F.isEmpty(hadoopHome))
- exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " +
- "valid Hadoop installation directory and run setup tool again.", null);
-
- hadoopHome = hadoopHome.replaceAll("\"", "");
-
- println(homeVar + " is set to '" + hadoopHome + "'.");
-
- String hiveHome = System.getenv("HIVE_HOME");
-
- if (!F.isEmpty(hiveHome)) {
- hiveHome = hiveHome.replaceAll("\"", "");
-
- println("HIVE_HOME is set to '" + hiveHome + "'.");
- }
-
- File hadoopDir = new File(hadoopHome);
-
- if (!hadoopDir.exists())
- exit("Hadoop installation folder does not exist.", null);
-
- if (!hadoopDir.isDirectory())
- exit("HADOOP_HOME must point to a directory.", null);
-
- if (!hadoopDir.canRead())
- exit("Hadoop installation folder can not be read. Please check permissions.", null);
-
- File hadoopCommonDir;
-
- String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME");
-
- if (F.isEmpty(hadoopCommonHome)) {
- hadoopCommonDir = new File(hadoopDir, "share/hadoop/common");
-
- println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'.");
- }
- else {
- println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'.");
-
- hadoopCommonDir = new File(hadoopCommonHome);
- }
-
- if (!hadoopCommonDir.canRead())
- exit("Failed to read Hadoop common dir in '" + hadoopCommonHome + "'.", null);
-
- File hadoopCommonLibDir = new File(hadoopCommonDir, "lib");
-
- if (!hadoopCommonLibDir.canRead())
- exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null);
-
- if (U.isWindows()) {
- checkJavaPathSpaces();
-
- File hadoopBinDir = new File(hadoopDir, "bin");
-
- if (!hadoopBinDir.canRead())
- exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null);
-
- File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE);
-
- if (!winutilsFile.exists()) {
- if (ask("File '" + WINUTILS_EXE + "' does not exist. " +
- "It may be replaced by a stub. Create it?")) {
- println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'.");
-
- boolean ok = false;
-
- try {
- ok = winutilsFile.createNewFile();
- }
- catch (IOException ignore) {
- // No-op.
- }
-
- if (!ok)
- exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null);
- }
- else
- println("Ok. But Hadoop client probably will not work on Windows this way...");
- }
-
- processCmdFiles(hadoopDir, "bin", "sbin", "libexec");
- }
-
- File igniteLibs = new File(new File(igniteHome), "libs");
-
- if (!igniteLibs.exists())
- exit("Ignite 'libs' folder is not found.", null);
-
- Collection<File> jarFiles = new ArrayList<>();
-
- addJarsInFolder(jarFiles, igniteLibs);
- addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop"));
-
- boolean jarsLinksCorrect = true;
-
- for (File file : jarFiles) {
- File link = new File(hadoopCommonLibDir, file.getName());
-
- jarsLinksCorrect &= isJarLinkCorrect(link, file);
-
- if (!jarsLinksCorrect)
- break;
- }
-
- if (!jarsLinksCorrect) {
- if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " +
- "Create appropriate symbolic links?")) {
- File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS);
-
- if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " +
- "installation. They must be deleted to continue. Continue?")) {
- for (File file : oldIgniteJarFiles) {
- println("Deleting file '" + file.getAbsolutePath() + "'.");
-
- if (!file.delete())
- exit("Failed to delete file '" + file.getPath() + "'.", null);
- }
- }
-
- for (File file : jarFiles) {
- File targetFile = new File(hadoopCommonLibDir, file.getName());
-
- try {
- println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'.");
-
- Files.createSymbolicLink(targetFile.toPath(), file.toPath());
- }
- catch (IOException e) {
- if (U.isWindows()) {
- warn("Ability to create symbolic links is required!");
- warn("On Windows platform you have to grant permission 'Create symbolic links'");
- warn("to your user or run the Accelerator as Administrator.");
- }
-
- exit("Creating symbolic link failed! Check permissions.", e);
- }
- }
- }
- else
- println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath...");
- }
-
- File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop");
-
- File igniteDocs = new File(igniteHome, "docs");
-
- if (!igniteDocs.canRead())
- exit("Failed to read Ignite 'docs' folder at '" + igniteDocs.getAbsolutePath() + "'.", null);
-
- if (hadoopEtc.canWrite()) { // TODO Bigtop
- if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " +
- "(existing files will be backed up)?")) {
- replaceWithBackup(new File(igniteDocs, "core-site.ignite.xml"), new File(hadoopEtc, "core-site.xml"));
-
- replaceWithBackup(new File(igniteDocs, "mapred-site.ignite.xml"), new File(hadoopEtc, "mapred-site.xml"));
- }
- else
- println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory...");
- }
-
- if (!F.isEmpty(hiveHome)) {
- File hiveConfDir = new File(hiveHome + File.separator + "conf");
-
- if (!hiveConfDir.canWrite())
- warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " +
- "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory.");
- else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?"))
- replaceWithBackup(new File(igniteDocs, "hive-site.ignite.xml"), new File(hiveConfDir, "hive-site.xml"));
- else
- println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory...");
- }
-
- println("Apache Hadoop setup is complete.");
- }
-
- /**
- * @param jarFiles Jars.
- * @param folder Folder.
- */
- private static void addJarsInFolder(Collection<File> jarFiles, File folder) {
- if (!folder.exists())
- exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null);
-
- jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS)));
- }
-
- /**
- * Checks that JAVA_HOME does not contain space characters.
- */
- private static void checkJavaPathSpaces() {
- String javaHome = System.getProperty("java.home");
-
- if (javaHome.contains(" ")) {
- warn("Java installation path contains space characters!");
- warn("Hadoop client will not be able to start using '" + javaHome + "'.");
- warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation.");
- }
- }
-
- /**
- * Checks Ignite home.
- *
- * @param igniteHome Ignite home.
- */
- private static void checkIgniteHome(String igniteHome) {
- URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation();
-
- try {
- Path jar = Paths.get(jarUrl.toURI());
- Path igHome = Paths.get(igniteHome);
-
- if (!jar.startsWith(igHome))
- exit("Ignite JAR files are not under IGNITE_HOME.", null);
- }
- catch (Exception e) {
- exit(e.getMessage(), e);
- }
- }
-
- /**
- * Replaces target file with source file.
- *
- * @param from From.
- * @param to To.
- */
- private static void replaceWithBackup(File from, File to) {
- if (!from.canRead())
- exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null);
-
- println("Replacing file '" + to.getAbsolutePath() + "'.");
-
- try {
- U.copy(from, renameToBak(to), true);
- }
- catch (IOException e) {
- exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e);
- }
- }
-
- /**
- * Renames file for backup.
- *
- * @param file File.
- * @return File.
- */
- private static File renameToBak(File file) {
- DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
-
- if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak")))
- exit("Failed to rename file '" + file.getPath() + "'.", null);
-
- return file;
- }
-
- /**
- * Checks if link is correct.
- *
- * @param link Symbolic link.
- * @param correctTarget Correct link target.
- * @return {@code true} If link target is correct.
- */
- private static boolean isJarLinkCorrect(File link, File correctTarget) {
- if (!Files.isSymbolicLink(link.toPath()))
- return false; // It is a real file or it does not exist.
-
- Path target = null;
-
- try {
- target = Files.readSymbolicLink(link.toPath());
- }
- catch (IOException e) {
- exit("Failed to read symbolic link: " + link.getAbsolutePath(), e);
- }
-
- return Files.exists(target) && target.toFile().equals(correctTarget);
- }
-
- /**
- * Writes the question end read the boolean answer from the console.
- *
- * @param question Question to write.
- * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise.
- */
- private static boolean ask(String question) {
- X.println();
- X.print(" < " + question + " (Y/N): ");
-
- String answer = null;
-
- if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES")))
- answer = "Y";
- else {
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
-
- try {
- answer = br.readLine();
- }
- catch (IOException e) {
- exit("Failed to read answer: " + e.getMessage(), e);
- }
- }
-
- if (answer != null && "Y".equals(answer.toUpperCase().trim())) {
- X.println(" > Yes.");
-
- return true;
- }
- else {
- X.println(" > No.");
-
- return false;
- }
- }
-
- /**
- * Exit with message.
- *
- * @param msg Exit message.
- */
- private static void exit(String msg, Exception e) {
- X.println(" ");
- X.println(" # " + msg);
- X.println(" # Setup failed, exiting... ");
-
- if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG")))
- e.printStackTrace();
-
- System.exit(1);
- }
-
- /**
- * Prints message.
- *
- * @param msg Message.
- */
- private static void println(String msg) {
- X.println(" > " + msg);
- }
-
- /**
- * Prints warning.
- *
- * @param msg Message.
- */
- private static void warn(String msg) {
- X.println(" ! " + msg);
- }
-
- /**
- * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the
- * answer. If it's 'Y' then backups original files and corrects invalid new line characters.
- *
- * @param rootDir Root directory to process.
- * @param dirs Directories inside of the root to process.
- */
- private static void processCmdFiles(File rootDir, String... dirs) {
- boolean answer = false;
-
- for (String dir : dirs) {
- File subDir = new File(rootDir, dir);
-
- File[] cmdFiles = subDir.listFiles(new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- return name.toLowerCase().endsWith(".cmd");
- }
- });
-
- for (File file : cmdFiles) {
- String content = null;
-
- try (Scanner scanner = new Scanner(file)) {
- content = scanner.useDelimiter("\\Z").next();
- }
- catch (FileNotFoundException e) {
- exit("Failed to read file '" + file + "'.", e);
- }
-
- boolean invalid = false;
-
- for (int i = 0; i < content.length(); i++) {
- if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) {
- invalid = true;
-
- break;
- }
- }
-
- if (invalid) {
- answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?");
-
- if (!answer) {
- println("Ok. But Windows most probably will fail to execute them...");
-
- return;
- }
-
- println("Fixing newline characters in file '" + file.getAbsolutePath() + "'.");
-
- renameToBak(file);
-
- try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
- for (int i = 0; i < content.length(); i++) {
- if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r'))
- writer.write("\r");
-
- writer.write(content.charAt(i));
- }
- }
- catch (IOException e) {
- exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e);
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskCancelledException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskCancelledException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskCancelledException.java
deleted file mode 100644
index c762181..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskCancelledException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-
-/**
- * Exception that throws when the task is cancelling.
- */
-public class GridHadoopTaskCancelledException extends IgniteException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param msg Exception message.
- */
- public GridHadoopTaskCancelledException(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopUtils.java
deleted file mode 100644
index 763f45a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopUtils.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobPriority;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop utility methods.
- */
-public class GridHadoopUtils {
- /** Property to store timestamp of new job id request. */
- public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs";
-
- /** Property to store timestamp of response of new job id request. */
- public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs";
-
- /** Property to store timestamp of job submission. */
- public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs";
-
- /** Property to set custom writer of job statistics. */
- public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer";
-
- /** Staging constant. */
- private static final String STAGING_CONSTANT = ".staging";
-
- /** Old mapper class attribute. */
- private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
-
- /** Old reducer class attribute. */
- private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
-
- /**
- * Wraps native split.
- *
- * @param id Split ID.
- * @param split Split.
- * @param hosts Hosts.
- * @throws IOException If failed.
- */
- public static GridHadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
- ByteArrayOutputStream arr = new ByteArrayOutputStream();
- ObjectOutput out = new ObjectOutputStream(arr);
-
- assert split instanceof Writable;
-
- ((Writable)split).write(out);
-
- out.flush();
-
- return new GridHadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
- }
-
- /**
- * Unwraps native split.
- *
- * @param o Wrapper.
- * @return Split.
- */
- public static Object unwrapSplit(GridHadoopSplitWrapper o) {
- try {
- Writable w = (Writable)GridHadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
-
- w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes())));
-
- return w;
- }
- catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- /**
- * Convert Ignite job status to Hadoop job status.
- *
- * @param status Ignite job status.
- * @return Hadoop job status.
- */
- public static JobStatus status(GridHadoopJobStatus status, Configuration conf) {
- JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
-
- float setupProgress = 0;
- float mapProgress = 0;
- float reduceProgress = 0;
- float cleanupProgress = 0;
-
- JobStatus.State state = JobStatus.State.RUNNING;
-
- switch (status.jobPhase()) {
- case PHASE_SETUP:
- setupProgress = 0.42f;
-
- break;
-
- case PHASE_MAP:
- setupProgress = 1;
- mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt();
-
- break;
-
- case PHASE_REDUCE:
- assert status.totalReducerCnt() > 0;
-
- setupProgress = 1;
- mapProgress = 1;
- reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
-
- break;
-
- case PHASE_CANCELLING:
- case PHASE_COMPLETE:
- if (!status.isFailed()) {
- setupProgress = 1;
- mapProgress = 1;
- reduceProgress = 1;
- cleanupProgress = 1;
-
- state = JobStatus.State.SUCCEEDED;
- }
- else
- state = JobStatus.State.FAILED;
-
- break;
-
- default:
- assert false;
- }
-
- return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state,
- JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A");
- }
-
- /**
- * Gets staging area directory.
- *
- * @param conf Configuration.
- * @param usr User.
- * @return Staging area directory.
- */
- public static Path stagingAreaDir(Configuration conf, String usr) {
- return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
- + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
- }
-
- /**
- * Gets job file.
- *
- * @param conf Configuration.
- * @param usr User.
- * @param jobId Job ID.
- * @return Job file.
- */
- public static Path jobFile(Configuration conf, String usr, JobID jobId) {
- return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
- }
-
- /**
- * Checks the attribute in configuration is not set.
- *
- * @param attr Attribute name.
- * @param msg Message for creation of exception.
- * @throws IgniteCheckedException If attribute is set.
- */
- public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException {
- if (cfg.get(attr) != null)
- throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode.");
- }
-
- /**
- * Creates JobInfo from hadoop configuration.
- *
- * @param cfg Hadoop configuration.
- * @return Job info.
- * @throws IgniteCheckedException If failed.
- */
- public static GridHadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
- JobConf jobConf = new JobConf(cfg);
-
- boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
- || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
-
- int numReduces = jobConf.getNumReduceTasks();
-
- jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null);
-
- if (jobConf.getUseNewMapper()) {
- String mode = "new map API";
-
- ensureNotSet(jobConf, "mapred.input.format.class", mode);
- ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
-
- if (numReduces != 0)
- ensureNotSet(jobConf, "mapred.partitioner.class", mode);
- else
- ensureNotSet(jobConf, "mapred.output.format.class", mode);
- }
- else {
- String mode = "map compatibility";
-
- ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
- ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
-
- if (numReduces != 0)
- ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
- else
- ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
- }
-
- if (numReduces != 0) {
- jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
-
- if (jobConf.getUseNewReducer()) {
- String mode = "new reduce API";
-
- ensureNotSet(jobConf, "mapred.output.format.class", mode);
- ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
- }
- else {
- String mode = "reduce compatibility";
-
- ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
- ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
- }
- }
-
- Map<String, String> props = new HashMap<>();
-
- for (Map.Entry<String, String> entry : jobConf)
- props.put(entry.getKey(), entry.getValue());
-
- return new GridHadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
- }
-
- /**
- * Throws new {@link IgniteCheckedException} with original exception is serialized into string.
- * This is needed to transfer error outside the current class loader.
- *
- * @param e Original exception.
- * @return IgniteCheckedException New exception.
- */
- public static IgniteCheckedException transformException(Throwable e) {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
-
- e.printStackTrace(new PrintStream(os, true));
-
- return new IgniteCheckedException(os.toString());
- }
-
- /**
- * Returns work directory for job execution.
- *
- * @param locNodeId Local node ID.
- * @param jobId Job ID.
- * @return Working directory for job.
- * @throws IgniteCheckedException If Failed.
- */
- public static File jobLocalDir(UUID locNodeId, GridHadoopJobId jobId) throws IgniteCheckedException {
- return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId);
- }
-
- /**
- * Returns subdirectory of job working directory for task execution.
- *
- * @param locNodeId Local node ID.
- * @param info Task info.
- * @return Working directory for task.
- * @throws IgniteCheckedException If Failed.
- */
- public static File taskLocalDir(UUID locNodeId, GridHadoopTaskInfo info) throws IgniteCheckedException {
- File jobLocDir = jobLocalDir(locNodeId, info.jobId());
-
- return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
- }
-
- /**
- * Constructor.
- */
- private GridHadoopUtils() {
- // No-op.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
new file mode 100644
index 0000000..1856e41
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+import org.objectweb.asm.*;
+import org.objectweb.asm.commons.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Class loader allowing explicitly load classes without delegation to parent class loader.
+ * Also supports class parsing for finding dependencies which contain transitive dependencies
+ * unavailable for parent.
+ */
+public class HadoopClassLoader extends URLClassLoader {
+ /**
+ * We are very parallel capable.
+ */
+ static {
+ registerAsParallelCapable();
+ }
+
+ /** */
+ private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader();
+
+ /** */
+ private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs());
+
+ /** */
+ private static volatile Collection<URL> hadoopJars;
+
+ /** */
+ private static final Map<String, Boolean> cache = new ConcurrentHashMap8<>();
+
+ /** */
+ private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>();
+
+ /**
+ * @param urls Urls.
+ */
+ public HadoopClassLoader(URL[] urls) {
+ super(addHadoopUrls(urls), APP_CLS_LDR);
+
+ assert !(getParent() instanceof HadoopClassLoader);
+ }
+
+ /**
+ * Need to parse only Ignite Hadoop and IGFS classes.
+ *
+ * @param cls Class name.
+ * @return {@code true} if we need to check this class.
+ */
+ private static boolean isHadoopIgfs(String cls) {
+ String ignitePackagePrefix = "org.apache.ignite";
+ int len = ignitePackagePrefix.length();
+
+ return cls.startsWith(ignitePackagePrefix) && (cls.indexOf("igfs.", len) != -1 || cls.indexOf(".fs.", len) != -1 || cls.indexOf("hadoop.", len) != -1);
+ }
+
+ /**
+ * @param cls Class name.
+ * @return {@code true} If this is Hadoop class.
+ */
+ private static boolean isHadoop(String cls) {
+ return cls.startsWith("org.apache.hadoop.");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ try {
+ if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath.
+ if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks.
+ return loadFromBytes(name, HadoopShutdownHookManager.class.getName());
+ else if (name.endsWith(".util.NativeCodeLoader"))
+ return loadFromBytes(name, HadoopNativeCodeLoader.class.getName());
+
+ return loadClassExplicitly(name, resolve);
+ }
+
+ if (isHadoopIgfs(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop.
+ Boolean hasDeps = cache.get(name);
+
+ if (hasDeps == null) {
+ hasDeps = hasExternalDependencies(name, new HashSet<String>());
+
+ cache.put(name, hasDeps);
+ }
+
+ if (hasDeps)
+ return loadClassExplicitly(name, resolve);
+ }
+
+ return super.loadClass(name, resolve);
+ }
+ catch (NoClassDefFoundError | ClassNotFoundException e) {
+ throw new ClassNotFoundException("Failed to load class: " + name, e);
+ }
+ }
+
+ /**
+ * @param name Name.
+ * @param replace Replacement.
+ * @return Class.
+ */
+ private Class<?> loadFromBytes(final String name, final String replace) {
+ synchronized (getClassLoadingLock(name)) {
+ // First, check if the class has already been loaded
+ Class c = findLoadedClass(name);
+
+ if (c != null)
+ return c;
+
+ byte[] bytes = bytesCache.get(name);
+
+ if (bytes == null) {
+ InputStream in = loadClassBytes(getParent(), replace);
+
+ ClassReader rdr;
+
+ try {
+ rdr = new ClassReader(in);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ ClassWriter w = new ClassWriter(Opcodes.ASM4);
+
+ rdr.accept(new RemappingClassAdapter(w, new Remapper() {
+ /** */
+ String replaceType = replace.replace('.', '/');
+
+ /** */
+ String nameType = name.replace('.', '/');
+
+ @Override public String map(String type) {
+ if (type.equals(replaceType))
+ return nameType;
+
+ return type;
+ }
+ }), ClassReader.EXPAND_FRAMES);
+
+ bytes = w.toByteArray();
+
+ bytesCache.put(name, bytes);
+ }
+
+ return defineClass(name, bytes, 0, bytes.length);
+ }
+ }
+
+ /**
+ * @param name Class name.
+ * @param resolve Resolve class.
+ * @return Class.
+ * @throws ClassNotFoundException If failed.
+ */
+ private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException {
+ synchronized (getClassLoadingLock(name)) {
+ // First, check if the class has already been loaded
+ Class c = findLoadedClass(name);
+
+ if (c == null) {
+ long t1 = System.nanoTime();
+
+ c = findClass(name);
+
+ // this is the defining class loader; record the stats
+ sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
+ sun.misc.PerfCounter.getFindClasses().increment();
+ }
+
+ if (resolve)
+ resolveClass(c);
+
+ return c;
+ }
+ }
+
+ /**
+ * @param ldr Loader.
+ * @param clsName Class.
+ * @return Input stream.
+ */
+ @Nullable private InputStream loadClassBytes(ClassLoader ldr, String clsName) {
+ return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class");
+ }
+
+ /**
+ * @param clsName Class name.
+ * @return {@code true} If the class has external dependencies.
+ */
+ boolean hasExternalDependencies(final String clsName, final Set<String> visited) {
+ if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external.
+ return true;
+
+ // Try to get from parent to check if the type accessible.
+ InputStream in = loadClassBytes(getParent(), clsName);
+
+ if (in == null) // The class is external itself, it must be loaded from this class loader.
+ return true;
+
+ if (!isHadoopIgfs(clsName)) // Other classes should not have external dependencies.
+ return false;
+
+ final ClassReader rdr;
+
+ try {
+ rdr = new ClassReader(in);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed to read class: " + clsName, e);
+ }
+
+ visited.add(clsName);
+
+ final AtomicBoolean hasDeps = new AtomicBoolean();
+
+ rdr.accept(new ClassVisitor(Opcodes.ASM4) {
+ AnnotationVisitor av = new AnnotationVisitor(Opcodes.ASM4) {
+ // TODO
+ };
+
+ FieldVisitor fv = new FieldVisitor(Opcodes.ASM4) {
+ @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) {
+ onType(desc);
+
+ return av;
+ }
+ };
+
+ MethodVisitor mv = new MethodVisitor(Opcodes.ASM4) {
+ @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) {
+ onType(desc);
+
+ return av;
+ }
+
+ @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) {
+ onType(desc);
+
+ return av;
+ }
+
+ @Override public AnnotationVisitor visitAnnotationDefault() {
+ return av;
+ }
+
+ @Override public void visitFieldInsn(int i, String owner, String name, String desc) {
+ onType(owner);
+ onType(desc);
+ }
+
+ @Override public void visitFrame(int i, int i2, Object[] locTypes, int i3, Object[] stackTypes) {
+ for (Object o : locTypes) {
+ if (o instanceof String)
+ onType((String)o);
+ }
+
+ for (Object o : stackTypes) {
+ if (o instanceof String)
+ onType((String)o);
+ }
+ }
+
+ @Override public void visitLocalVariable(String name, String desc, String signature, Label lb,
+ Label lb2, int i) {
+ onType(desc);
+ }
+
+ @Override public void visitMethodInsn(int i, String owner, String name, String desc) {
+ onType(owner);
+ }
+
+ @Override public void visitMultiANewArrayInsn(String desc, int dim) {
+ onType(desc);
+ }
+
+ @Override public void visitTryCatchBlock(Label lb, Label lb2, Label lb3, String e) {
+ onType(e);
+ }
+ };
+
+ void onClass(String depCls) {
+ assert validateClassName(depCls) : depCls;
+
+ if (depCls.startsWith("java.")) // Filter out platform classes.
+ return;
+
+ if (visited.contains(depCls))
+ return;
+
+ Boolean res = cache.get(depCls);
+
+ if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, visited)))
+ hasDeps.set(true);
+ }
+
+ void onType(String type) {
+ if (type == null)
+ return;
+
+ int off = 0;
+
+ while (type.charAt(off) == '[')
+ off++; // Handle arrays.
+
+ if (off != 0)
+ type = type.substring(off);
+
+ if (type.length() == 1)
+ return; // Get rid of primitives.
+
+ if (type.charAt(type.length() - 1) == ';') {
+ assert type.charAt(0) == 'L' : type;
+
+ type = type.substring(1, type.length() - 1);
+ }
+
+ type = type.replace('/', '.');
+
+ onClass(type);
+ }
+
+ @Override public void visit(int i, int i2, String name, String signature, String superName,
+ String[] ifaces) {
+ onType(superName);
+
+ if (ifaces != null) {
+ for (String iface : ifaces)
+ onType(iface);
+ }
+ }
+
+ @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
+ onType(desc);
+
+ return av;
+ }
+
+ @Override public void visitInnerClass(String name, String outerName, String innerName, int i) {
+ onType(name);
+ }
+
+ @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) {
+ onType(desc);
+
+ return fv;
+ }
+
+ @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature,
+ String[] exceptions) {
+ if (exceptions != null) {
+ for (String e : exceptions)
+ onType(e);
+ }
+
+ return mv;
+ }
+ }, 0);
+
+ if (hasDeps.get()) // We already know that we have dependencies, no need to check parent.
+ return true;
+
+ // Here we are known to not have any dependencies but possibly we have a parent which have them.
+ int idx = clsName.lastIndexOf('$');
+
+ if (idx == -1) // No parent class.
+ return false;
+
+ String parentCls = clsName.substring(0, idx);
+
+ if (visited.contains(parentCls))
+ return false;
+
+ Boolean res = cache.get(parentCls);
+
+ if (res == null)
+ res = hasExternalDependencies(parentCls, visited);
+
+ return res;
+ }
+
+ /**
+ * @param name Class name.
+ * @return {@code true} If this is a valid class name.
+ */
+ private static boolean validateClassName(String name) {
+ int len = name.length();
+
+ if (len <= 1)
+ return false;
+
+ if (!Character.isJavaIdentifierStart(name.charAt(0)))
+ return false;
+
+ boolean hasDot = false;
+
+ for (int i = 1; i < len; i++) {
+ char c = name.charAt(i);
+
+ if (c == '.')
+ hasDot = true;
+ else if (!Character.isJavaIdentifierPart(c))
+ return false;
+ }
+
+ return hasDot;
+ }
+
+ /**
+ * @param name Variable name.
+ * @param dflt Default.
+ * @return Value.
+ */
+ private static String getEnv(String name, String dflt) {
+ String res = System.getProperty(name);
+
+ if (F.isEmpty(res))
+ res = System.getenv(name);
+
+ return F.isEmpty(res) ? dflt : res;
+ }
+
+ /**
+ * @param res Result.
+ * @param dir Directory.
+ * @param startsWith Starts with prefix.
+ * @throws MalformedURLException If failed.
+ */
+ private static void addUrls(Collection<URL> res, File dir, final String startsWith) throws Exception {
+ File[] files = dir.listFiles(new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return startsWith == null || name.startsWith(startsWith);
+ }
+ });
+
+ if (files == null)
+ throw new IOException("Path is not a directory: " + dir);
+
+ for (File file : files)
+ res.add(file.toURI().toURL());
+ }
+
+ /**
+ * @param urls URLs.
+ * @return URLs.
+ */
+ private static URL[] addHadoopUrls(URL[] urls) {
+ Collection<URL> hadoopJars;
+
+ try {
+ hadoopJars = hadoopUrls();
+ }
+ catch (IgniteCheckedException e) {
+ throw new RuntimeException(e);
+ }
+
+ ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length));
+
+ list.addAll(appJars);
+ list.addAll(hadoopJars);
+
+ if (!F.isEmpty(urls))
+ list.addAll(F.asList(urls));
+
+ return list.toArray(new URL[list.size()]);
+ }
+
+ /**
+ * @return HADOOP_HOME Variable.
+ */
+ @Nullable public static String hadoopHome() {
+ return getEnv("HADOOP_PREFIX", getEnv("HADOOP_HOME", null));
+ }
+
+ /**
+ * @return Collection of jar URLs.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static Collection<URL> hadoopUrls() throws IgniteCheckedException {
+ Collection<URL> hadoopUrls = hadoopJars;
+
+ if (hadoopUrls != null)
+ return hadoopUrls;
+
+ synchronized (HadoopClassLoader.class) {
+ hadoopUrls = hadoopJars;
+
+ if (hadoopUrls != null)
+ return hadoopUrls;
+
+ hadoopUrls = new ArrayList<>();
+
+ String hadoopPrefix = hadoopHome();
+
+ if (F.isEmpty(hadoopPrefix))
+ throw new IgniteCheckedException("Failed resolve Hadoop installation location. Either HADOOP_PREFIX or " +
+ "HADOOP_HOME environment variables must be set.");
+
+ String commonHome = getEnv("HADOOP_COMMON_HOME", hadoopPrefix + "/share/hadoop/common");
+ String hdfsHome = getEnv("HADOOP_HDFS_HOME", hadoopPrefix + "/share/hadoop/hdfs");
+ String mapredHome = getEnv("HADOOP_MAPRED_HOME", hadoopPrefix + "/share/hadoop/mapreduce");
+
+ try {
+ addUrls(hadoopUrls, new File(commonHome + "/lib"), null);
+ addUrls(hadoopUrls, new File(hdfsHome + "/lib"), null);
+ addUrls(hadoopUrls, new File(mapredHome + "/lib"), null);
+
+ addUrls(hadoopUrls, new File(hdfsHome), "hadoop-hdfs-");
+
+ addUrls(hadoopUrls, new File(commonHome), "hadoop-common-");
+ addUrls(hadoopUrls, new File(commonHome), "hadoop-auth-");
+ addUrls(hadoopUrls, new File(commonHome + "/lib"), "hadoop-auth-");
+
+ addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-common");
+ addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-core");
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ hadoopJars = hadoopUrls;
+
+ return hadoopUrls;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
new file mode 100644
index 0000000..cea11eb
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * Abstract class for all hadoop components.
+ */
+public abstract class HadoopComponent {
+ /** Hadoop context. */
+ protected HadoopContext ctx;
+
+ /** Logger. */
+ protected IgniteLogger log;
+
+ /**
+ * @param ctx Hadoop context.
+ */
+ public void start(HadoopContext ctx) throws IgniteCheckedException {
+ this.ctx = ctx;
+
+ log = ctx.kernalContext().log(getClass());
+ }
+
+ /**
+ * Stops manager.
+ */
+ public void stop(boolean cancel) {
+ // No-op.
+ }
+
+ /**
+ * Callback invoked when all grid components are started.
+ */
+ public void onKernalStart() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /**
+ * Callback invoked before all grid components are stopped.
+ */
+ public void onKernalStop(boolean cancel) {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
new file mode 100644
index 0000000..68f0baf
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Hadoop accelerator context.
+ */
+public class HadoopContext {
+ /** Kernal context. */
+ private GridKernalContext ctx;
+
+ /** Hadoop configuration. */
+ private HadoopConfiguration cfg;
+
+ /** Job tracker. */
+ private HadoopJobTracker jobTracker;
+
+ /** External task executor. */
+ private HadoopTaskExecutorAdapter taskExecutor;
+
+ /** */
+ private HadoopShuffle shuffle;
+
+ /** Managers list. */
+ private List<HadoopComponent> components = new ArrayList<>();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public HadoopContext(
+ GridKernalContext ctx,
+ HadoopConfiguration cfg,
+ HadoopJobTracker jobTracker,
+ HadoopTaskExecutorAdapter taskExecutor,
+ HadoopShuffle shuffle
+ ) {
+ this.ctx = ctx;
+ this.cfg = cfg;
+
+ this.jobTracker = add(jobTracker);
+ this.taskExecutor = add(taskExecutor);
+ this.shuffle = add(shuffle);
+ }
+
+ /**
+ * Gets list of managers.
+ *
+ * @return List of managers.
+ */
+ public List<HadoopComponent> components() {
+ return components;
+ }
+
+ /**
+ * Gets kernal context.
+ *
+ * @return Grid kernal context instance.
+ */
+ public GridKernalContext kernalContext() {
+ return ctx;
+ }
+
+ /**
+ * Gets Hadoop configuration.
+ *
+ * @return Hadoop configuration.
+ */
+ public HadoopConfiguration configuration() {
+ return cfg;
+ }
+
+ /**
+ * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}.
+ *
+ * @return Local node ID.
+ */
+ public UUID localNodeId() {
+ return ctx.localNodeId();
+ }
+
+ /**
+ * Gets local node order.
+ *
+ * @return Local node order.
+ */
+ public long localNodeOrder() {
+ assert ctx.discovery() != null;
+
+ return ctx.discovery().localNode().order();
+ }
+
+ /**
+ * @return Hadoop-enabled nodes.
+ */
+ public Collection<ClusterNode> nodes() {
+ return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersion());
+ }
+
+ /**
+ * @return {@code True} if
+ */
+ public boolean jobUpdateLeader() {
+ long minOrder = Long.MAX_VALUE;
+ ClusterNode minOrderNode = null;
+
+ for (ClusterNode node : nodes()) {
+ if (node.order() < minOrder) {
+ minOrder = node.order();
+ minOrderNode = node;
+ }
+ }
+
+ assert minOrderNode != null;
+
+ return localNodeId().equals(minOrderNode.id());
+ }
+
+ /**
+ * @param meta Job metadata.
+ * @return {@code true} If local node is participating in job execution.
+ */
+ public boolean isParticipating(HadoopJobMetadata meta) {
+ UUID locNodeId = localNodeId();
+
+ if (locNodeId.equals(meta.submitNodeId()))
+ return true;
+
+ HadoopMapReducePlan plan = meta.mapReducePlan();
+
+ return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
+ }
+
+ /**
+ * @return Jon tracker instance.
+ */
+ public HadoopJobTracker jobTracker() {
+ return jobTracker;
+ }
+
+ /**
+ * @return Task executor.
+ */
+ public HadoopTaskExecutorAdapter taskExecutor() {
+ return taskExecutor;
+ }
+
+ /**
+ * @return Shuffle.
+ */
+ public HadoopShuffle shuffle() {
+ return shuffle;
+ }
+
+ /**
+ * @return Map-reduce planner.
+ */
+ public HadoopMapReducePlanner planner() {
+ return cfg.getMapReducePlanner();
+ }
+
+ /**
+ * Adds component.
+ *
+ * @param c Component to add.
+ * @return Added manager.
+ */
+ private <C extends HadoopComponent> C add(C c) {
+ components.add(c);
+
+ return c;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
new file mode 100644
index 0000000..77eb6d2
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.*;
+
+/**
+ * Hadoop job info based on default Hadoop configuration.
+ */
+public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
+ /** */
+ private static final long serialVersionUID = 5489900236464999951L;
+
+ /** {@code true} If job has combiner. */
+ private boolean hasCombiner;
+
+ /** Number of reducers configured for job. */
+ private int numReduces;
+
+ /** Configuration. */
+ private Map<String,String> props = new HashMap<>();
+
+ /** Job name. */
+ private String jobName;
+
+ /** User name. */
+ private String user;
+
+ /** */
+ private static volatile Class<?> jobCls;
+
+ /**
+ * Default constructor required by {@link Externalizable}.
+ */
+ public HadoopDefaultJobInfo() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param jobName Job name.
+ * @param user User name.
+ * @param hasCombiner {@code true} If job has combiner.
+ * @param numReduces Number of reducers configured for job.
+ * @param props All other properties of the job.
+ */
+ public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces,
+ Map<String, String> props) {
+ this.jobName = jobName;
+ this.user = user;
+ this.hasCombiner = hasCombiner;
+ this.numReduces = numReduces;
+ this.props = props;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String property(String name) {
+ return props.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
+ try {
+ Class<?> jobCls0 = jobCls;
+
+ if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
+ synchronized (HadoopDefaultJobInfo.class) {
+ if ((jobCls0 = jobCls) == null) {
+ HadoopClassLoader ldr = new HadoopClassLoader(null);
+
+ jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
+ }
+ }
+ }
+
+ Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class, HadoopDefaultJobInfo.class,
+ IgniteLogger.class);
+
+ return (HadoopJob)constructor.newInstance(jobId, this, log);
+ }
+ // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call.
+ catch (Throwable t) {
+ throw new IgniteCheckedException(t);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasCombiner() {
+ return hasCombiner;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasReducer() {
+ return reducers() > 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int reducers() {
+ return numReduces;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String jobName() {
+ return jobName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ return user;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, jobName);
+ U.writeString(out, user);
+
+ out.writeBoolean(hasCombiner);
+ out.writeInt(numReduces);
+
+ U.writeStringMap(out, props);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobName = U.readString(in);
+ user = U.readString(in);
+
+ hasCombiner = in.readBoolean();
+ numReduces = in.readInt();
+
+ props = U.readStringMap(in);
+ }
+
+ /**
+ * @return Properties of the job.
+ */
+ public Map<String, String> properties() {
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
new file mode 100644
index 0000000..27542a1
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Hadoop facade implementation.
+ */
+public class HadoopImpl implements Hadoop {
+ /** Hadoop processor. */
+ private final HadoopProcessor proc;
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /**
+ * Constructor.
+ *
+ * @param proc Hadoop processor.
+ */
+ HadoopImpl(HadoopProcessor proc) {
+ this.proc = proc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration configuration() {
+ return proc.config();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobId nextJobId() {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.nextJobId();
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get next job ID (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.submit(jobId, jobInfo);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to submit job (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.status(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job status (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.counters(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job counters (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.finishFuture(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job finish future (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.kill(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to kill job (grid is stopping).");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
new file mode 100644
index 0000000..b0c2d3e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.counters.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop +counter group adapter.
+ */
+class HadoopMapReduceCounterGroup implements CounterGroup {
+ /** Counters. */
+ private final HadoopMapReduceCounters cntrs;
+
+ /** Group name. */
+ private final String name;
+
+ /**
+ * Creates new instance.
+ *
+ * @param cntrs Client counters instance.
+ * @param name Group name.
+ */
+ HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
+ this.cntrs = cntrs;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDisplayName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDisplayName(String displayName) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addCounter(Counter counter) {
+ addCounter(counter.getName(), counter.getDisplayName(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter addCounter(String name, String displayName, long value) {
+ final Counter counter = cntrs.findCounter(this.name, name);
+
+ counter.setValue(value);
+
+ return counter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, String displayName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, boolean create) {
+ return cntrs.findCounter(name, counterName, create);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return cntrs.groupSize(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
+ for (final Counter counter : rightGroup)
+ cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Counter> iterator() {
+ return cntrs.iterateGroup(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
new file mode 100644
index 0000000..c2c9e2a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.counters.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop counters adapter.
+ */
+public class HadoopMapReduceCounters extends Counters {
+ /** */
+ private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
+
+ /**
+ * Creates new instance based on given counters.
+ *
+ * @param cntrs Counters to adapt.
+ */
+ public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) {
+ for (HadoopCounter cntr : cntrs.all())
+ if (cntr instanceof HadoopLongCounter)
+ this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
+ return addGroup(grp.getName(), grp.getDisplayName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroup addGroup(String name, String displayName) {
+ return new HadoopMapReduceCounterGroup(this, name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String grpName, String cntrName) {
+ return findCounter(grpName, cntrName, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Counter findCounter(Enum<?> key) {
+ return findCounter(key.getDeclaringClass().getName(), key.name(), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
+ return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Iterable<String> getGroupNames() {
+ Collection<String> res = new HashSet<>();
+
+ for (HadoopCounter counter : cntrs.values())
+ res.add(counter.group());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<CounterGroup> iterator() {
+ final Iterator<String> iter = getGroupNames().iterator();
+
+ return new Iterator<CounterGroup>() {
+ @Override public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override public CounterGroup next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next());
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException("not implemented");
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized CounterGroup getGroup(String grpName) {
+ return new HadoopMapReduceCounterGroup(this, grpName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int countCounters() {
+ return cntrs.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
+ for (CounterGroup group : other) {
+ for (Counter counter : group) {
+ findCounter(group.getName(), counter.getName()).increment(counter.getValue());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object genericRight) {
+ if (!(genericRight instanceof HadoopMapReduceCounters))
+ return false;
+
+ return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return cntrs.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWriteAllCounters(boolean snd) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean getWriteAllCounters() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Limits limits() {
+ return null;
+ }
+
+ /**
+ * Returns size of a group.
+ *
+ * @param grpName Name of the group.
+ * @return amount of counters in the given group.
+ */
+ public int groupSize(String grpName) {
+ int res = 0;
+
+ for (HadoopCounter counter : cntrs.values()) {
+ if (grpName.equals(counter.group()))
+ res++;
+ }
+
+ return res;
+ }
+
+ /**
+ * Returns counters iterator for specified group.
+ *
+ * @param grpName Name of the group to iterate.
+ * @return Counters iterator.
+ */
+ public Iterator<Counter> iterateGroup(String grpName) {
+ Collection<Counter> grpCounters = new ArrayList<>();
+
+ for (HadoopLongCounter counter : cntrs.values()) {
+ if (grpName.equals(counter.group()))
+ grpCounters.add(new HadoopV2Counter(counter));
+ }
+
+ return grpCounters.iterator();
+ }
+
+ /**
+ * Find a counter in the group.
+ *
+ * @param grpName The name of the counter group.
+ * @param cntrName The name of the counter.
+ * @param create Create the counter if not found if true.
+ * @return The counter that was found or added or {@code null} if create is false.
+ */
+ public Counter findCounter(String grpName, String cntrName, boolean create) {
+ T2<String, String> key = new T2<>(grpName, cntrName);
+
+ HadoopLongCounter internalCntr = cntrs.get(key);
+
+ if (internalCntr == null & create) {
+ internalCntr = new HadoopLongCounter(grpName,cntrName);
+
+ cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
+ }
+
+ return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
+ }
+}