You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:53:42 UTC
[50/92] [abbrv] [partial] ignite git commit: Moving classes around.
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
deleted file mode 100644
index 57a853f..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.FileSystemCounter;
-import org.apache.hadoop.mapreduce.counters.AbstractCounters;
-import org.apache.hadoop.mapreduce.counters.Limits;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter;
-import org.apache.ignite.internal.util.typedef.T2;
-
-/**
- * Hadoop counters adapter.
- */
-public class HadoopMapReduceCounters extends Counters {
- /** */
- private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
-
- /**
- * Creates new instance based on given counters.
- *
- * @param cntrs Counters to adapt.
- */
- public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) {
- for (HadoopCounter cntr : cntrs.all())
- if (cntr instanceof HadoopLongCounter)
- this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
- return addGroup(grp.getName(), grp.getDisplayName());
- }
-
- /** {@inheritDoc} */
- @Override public CounterGroup addGroup(String name, String displayName) {
- return new HadoopMapReduceCounterGroup(this, name);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String grpName, String cntrName) {
- return findCounter(grpName, cntrName, true);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Counter findCounter(Enum<?> key) {
- return findCounter(key.getDeclaringClass().getName(), key.name(), true);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
- return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Iterable<String> getGroupNames() {
- Collection<String> res = new HashSet<>();
-
- for (HadoopCounter counter : cntrs.values())
- res.add(counter.group());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<CounterGroup> iterator() {
- final Iterator<String> iter = getGroupNames().iterator();
-
- return new Iterator<CounterGroup>() {
- @Override public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override public CounterGroup next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next());
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException("not implemented");
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public synchronized CounterGroup getGroup(String grpName) {
- return new HadoopMapReduceCounterGroup(this, grpName);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int countCounters() {
- return cntrs.size();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
- for (CounterGroup group : other) {
- for (Counter counter : group) {
- findCounter(group.getName(), counter.getName()).increment(counter.getValue());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object genericRight) {
- if (!(genericRight instanceof HadoopMapReduceCounters))
- return false;
-
- return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return cntrs.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public void setWriteAllCounters(boolean snd) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean getWriteAllCounters() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Limits limits() {
- return null;
- }
-
- /**
- * Returns size of a group.
- *
- * @param grpName Name of the group.
- * @return amount of counters in the given group.
- */
- public int groupSize(String grpName) {
- int res = 0;
-
- for (HadoopCounter counter : cntrs.values()) {
- if (grpName.equals(counter.group()))
- res++;
- }
-
- return res;
- }
-
- /**
- * Returns counters iterator for specified group.
- *
- * @param grpName Name of the group to iterate.
- * @return Counters iterator.
- */
- public Iterator<Counter> iterateGroup(String grpName) {
- Collection<Counter> grpCounters = new ArrayList<>();
-
- for (HadoopLongCounter counter : cntrs.values()) {
- if (grpName.equals(counter.group()))
- grpCounters.add(new HadoopV2Counter(counter));
- }
-
- return grpCounters.iterator();
- }
-
- /**
- * Find a counter in the group.
- *
- * @param grpName The name of the counter group.
- * @param cntrName The name of the counter.
- * @param create Create the counter if not found if true.
- * @return The counter that was found or added or {@code null} if create is false.
- */
- public Counter findCounter(String grpName, String cntrName, boolean create) {
- T2<String, String> key = new T2<>(grpName, cntrName);
-
- HadoopLongCounter internalCntr = cntrs.get(key);
-
- if (internalCntr == null & create) {
- internalCntr = new HadoopLongCounter(grpName,cntrName);
-
- cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
- }
-
- return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
deleted file mode 100644
index b9c20c3..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Hadoop processor.
- */
-public class HadoopProcessor extends HadoopProcessorAdapter {
- /** Job ID counter. */
- private final AtomicInteger idCtr = new AtomicInteger();
-
- /** Hadoop context. */
- @GridToStringExclude
- private HadoopContext hctx;
-
- /** Hadoop facade for public API. */
- @GridToStringExclude
- private Hadoop hadoop;
-
- /**
- * Constructor.
- *
- * @param ctx Kernal context.
- */
- public HadoopProcessor(GridKernalContext ctx) {
- super(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- if (ctx.isDaemon())
- return;
-
- HadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
-
- if (cfg == null)
- cfg = new HadoopConfiguration();
- else
- cfg = new HadoopConfiguration(cfg);
-
- initializeDefaults(cfg);
-
- hctx = new HadoopContext(
- ctx,
- cfg,
- new HadoopJobTracker(),
- new HadoopEmbeddedTaskExecutor(),
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
- new HadoopShuffle());
-
- for (HadoopComponent c : hctx.components())
- c.start(hctx);
-
- hadoop = new HadoopImpl(this);
-
- ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg));
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- if (hctx == null)
- return;
-
- for (HadoopComponent c : hctx.components())
- c.onKernalStart();
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- super.onKernalStop(cancel);
-
- if (hctx == null)
- return;
-
- List<HadoopComponent> components = hctx.components();
-
- for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
- HadoopComponent c = it.previous();
-
- c.onKernalStop(cancel);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- super.stop(cancel);
-
- if (hctx == null)
- return;
-
- List<HadoopComponent> components = hctx.components();
-
- for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
- HadoopComponent c = it.previous();
-
- c.stop(cancel);
- }
- }
-
- /**
- * Gets Hadoop context.
- *
- * @return Hadoop context.
- */
- public HadoopContext context() {
- return hctx;
- }
-
- /** {@inheritDoc} */
- @Override public Hadoop hadoop() {
- if (hadoop == null)
- throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
- "is HADOOP_HOME environment variable set?)");
-
- return hadoop;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration config() {
- return hctx.configuration();
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobId nextJobId() {
- return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
- return hctx.jobTracker().submit(jobId, jobInfo);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().status(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().jobCounters(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().finishFuture(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().killJob(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public void validateEnvironment() throws IgniteCheckedException {
- // Perform some static checks as early as possible, so that any recoverable exceptions are thrown here.
- try {
- HadoopLocations loc = HadoopClasspathUtils.locations();
-
- if (!F.isEmpty(loc.home()))
- U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " + loc.home());
-
- U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + loc.common() + ", " + loc.hdfs() + ", " +
- loc.mapred());
- }
- catch (IOException ioe) {
- throw new IgniteCheckedException(ioe.getMessage(), ioe);
- }
-
- HadoopClassLoader.hadoopUrls();
- }
-
- /**
- * Initializes default hadoop configuration.
- *
- * @param cfg Hadoop configuration.
- */
- private void initializeDefaults(HadoopConfiguration cfg) {
- if (cfg.getMapReducePlanner() == null)
- cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopProcessor.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
deleted file mode 100644
index ed39ce5..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Scanner;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
-import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
-
-/**
- * Setup tool to configure Hadoop client.
- */
-public class HadoopSetup {
- /** */
- public static final String WINUTILS_EXE = "winutils.exe";
-
- /** */
- private static final FilenameFilter IGNITE_JARS = new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- return name.startsWith("ignite-") && name.endsWith(".jar");
- }
- };
-
- /**
- * The main method.
- * @param ignore Params.
- */
- public static void main(String[] ignore) {
- X.println(
- " __________ ________________ ",
- " / _/ ___/ |/ / _/_ __/ __/ ",
- " _/ // (7 7 // / / / / _/ ",
- "/___/\\___/_/|_/___/ /_/ /___/ ",
- " for Apache Hadoop ",
- " ",
- "ver. " + ACK_VER_STR,
- COPYRIGHT);
-
- configureHadoop();
- }
-
- /**
- * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop.
- * It performs these operations:
- * <ul>
- * <li>Check for setting of HADOOP_HOME environment variable.</li>
- * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li>
- * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li>
- * <li>In Windows check new line character issues in CMD scripts.</li>
- * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li>
- * </ul>
- */
- private static void configureHadoop() {
- String igniteHome = U.getIgniteHome();
-
- println("IGNITE_HOME is set to '" + igniteHome + "'.");
-
- checkIgniteHome(igniteHome);
-
- String homeVar = "HADOOP_HOME";
- String hadoopHome = System.getenv(homeVar);
-
- if (F.isEmpty(hadoopHome)) {
- homeVar = "HADOOP_PREFIX";
- hadoopHome = System.getenv(homeVar);
- }
-
- if (F.isEmpty(hadoopHome))
- exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " +
- "valid Hadoop installation directory and run setup tool again.", null);
-
- hadoopHome = hadoopHome.replaceAll("\"", "");
-
- println(homeVar + " is set to '" + hadoopHome + "'.");
-
- String hiveHome = System.getenv("HIVE_HOME");
-
- if (!F.isEmpty(hiveHome)) {
- hiveHome = hiveHome.replaceAll("\"", "");
-
- println("HIVE_HOME is set to '" + hiveHome + "'.");
- }
-
- File hadoopDir = new File(hadoopHome);
-
- if (!hadoopDir.exists())
- exit("Hadoop installation folder does not exist.", null);
-
- if (!hadoopDir.isDirectory())
- exit("HADOOP_HOME must point to a directory.", null);
-
- if (!hadoopDir.canRead())
- exit("Hadoop installation folder can not be read. Please check permissions.", null);
-
- final File hadoopCommonDir;
-
- String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME");
-
- if (F.isEmpty(hadoopCommonHome)) {
- hadoopCommonDir = new File(hadoopDir, "share/hadoop/common");
-
- println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'.");
- }
- else {
- println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'.");
-
- hadoopCommonDir = new File(hadoopCommonHome);
- }
-
- if (!hadoopCommonDir.canRead())
- exit("Failed to read Hadoop common dir '" + hadoopCommonDir + "'.", null);
-
- final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib");
-
- if (!hadoopCommonLibDir.canRead())
- exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null);
-
- if (U.isWindows()) {
- checkJavaPathSpaces();
-
- final File hadoopBinDir = new File(hadoopDir, "bin");
-
- if (!hadoopBinDir.canRead())
- exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null);
-
- File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE);
-
- if (!winutilsFile.exists()) {
- if (ask("File '" + WINUTILS_EXE + "' does not exist. " +
- "It may be replaced by a stub. Create it?")) {
- println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'.");
-
- boolean ok = false;
-
- try {
- ok = winutilsFile.createNewFile();
- }
- catch (IOException ignore) {
- // No-op.
- }
-
- if (!ok)
- exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null);
- }
- else
- println("Ok. But Hadoop client probably will not work on Windows this way...");
- }
-
- processCmdFiles(hadoopDir, "bin", "sbin", "libexec");
- }
-
- File igniteLibs = new File(new File(igniteHome), "libs");
-
- if (!igniteLibs.exists())
- exit("Ignite 'libs' folder is not found.", null);
-
- Collection<File> jarFiles = new ArrayList<>();
-
- addJarsInFolder(jarFiles, igniteLibs);
- addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop"));
- addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl"));
-
- boolean jarsLinksCorrect = true;
-
- for (File file : jarFiles) {
- File link = new File(hadoopCommonLibDir, file.getName());
-
- jarsLinksCorrect &= isJarLinkCorrect(link, file);
-
- if (!jarsLinksCorrect)
- break;
- }
-
- if (!jarsLinksCorrect) {
- if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " +
- "Create appropriate symbolic links?")) {
- File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS);
-
- if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " +
- "installation. They must be deleted to continue. Continue?")) {
- for (File file : oldIgniteJarFiles) {
- println("Deleting file '" + file.getAbsolutePath() + "'.");
-
- if (!file.delete())
- exit("Failed to delete file '" + file.getPath() + "'.", null);
- }
- }
-
- for (File file : jarFiles) {
- File targetFile = new File(hadoopCommonLibDir, file.getName());
-
- try {
- println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'.");
-
- Files.createSymbolicLink(targetFile.toPath(), file.toPath());
- }
- catch (IOException e) {
- if (U.isWindows()) {
- warn("Ability to create symbolic links is required!");
- warn("On Windows platform you have to grant permission 'Create symbolic links'");
- warn("to your user or run the Accelerator as Administrator.");
- }
-
- exit("Creating symbolic link failed! Check permissions.", e);
- }
- }
- }
- else
- println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath...");
- }
-
- File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop");
-
- File igniteHadoopCfg = igniteHadoopConfig(igniteHome);
-
- if (!igniteHadoopCfg.canRead())
- exit("Failed to read Ignite Hadoop 'config' folder at '" + igniteHadoopCfg.getAbsolutePath() + "'.", null);
-
- if (hadoopEtc.canWrite()) { // TODO Bigtop
- if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " +
- "(existing files will be backed up)?")) {
- replaceWithBackup(new File(igniteHadoopCfg, "core-site.ignite.xml"),
- new File(hadoopEtc, "core-site.xml"));
-
- replaceWithBackup(new File(igniteHadoopCfg, "mapred-site.ignite.xml"),
- new File(hadoopEtc, "mapred-site.xml"));
- }
- else
- println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory...");
- }
-
- if (!F.isEmpty(hiveHome)) {
- File hiveConfDir = new File(hiveHome + File.separator + "conf");
-
- if (!hiveConfDir.canWrite())
- warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " +
- "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory.");
- else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?"))
- replaceWithBackup(new File(igniteHadoopCfg, "hive-site.ignite.xml"),
- new File(hiveConfDir, "hive-site.xml"));
- else
- println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory...");
- }
-
- println("Apache Hadoop setup is complete.");
- }
-
- /**
- * Get Ignite Hadoop config directory.
- *
- * @param igniteHome Ignite home.
- * @return Ignite Hadoop config directory.
- */
- private static File igniteHadoopConfig(String igniteHome) {
- Path path = Paths.get(igniteHome, "modules", "hadoop", "config");
-
- if (!Files.exists(path))
- path = Paths.get(igniteHome, "config", "hadoop");
-
- if (Files.exists(path))
- return path.toFile();
- else
- return new File(igniteHome, "docs");
- }
-
- /**
- * @param jarFiles Jars.
- * @param folder Folder.
- */
- private static void addJarsInFolder(Collection<File> jarFiles, File folder) {
- if (!folder.exists())
- exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null);
-
- jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS)));
- }
-
- /**
- * Checks that JAVA_HOME does not contain space characters.
- */
- private static void checkJavaPathSpaces() {
- String javaHome = System.getProperty("java.home");
-
- if (javaHome.contains(" ")) {
- warn("Java installation path contains space characters!");
- warn("Hadoop client will not be able to start using '" + javaHome + "'.");
- warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation.");
- }
- }
-
- /**
- * Checks Ignite home.
- *
- * @param igniteHome Ignite home.
- */
- private static void checkIgniteHome(String igniteHome) {
- URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation();
-
- try {
- Path jar = Paths.get(jarUrl.toURI());
- Path igHome = Paths.get(igniteHome);
-
- if (!jar.startsWith(igHome))
- exit("Ignite JAR files are not under IGNITE_HOME.", null);
- }
- catch (Exception e) {
- exit(e.getMessage(), e);
- }
- }
-
- /**
- * Replaces target file with source file.
- *
- * @param from From.
- * @param to To.
- */
- private static void replaceWithBackup(File from, File to) {
- if (!from.canRead())
- exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null);
-
- println("Replacing file '" + to.getAbsolutePath() + "'.");
-
- try {
- U.copy(from, renameToBak(to), true);
- }
- catch (IOException e) {
- exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e);
- }
- }
-
- /**
- * Renames file for backup.
- *
- * @param file File.
- * @return File.
- */
- private static File renameToBak(File file) {
- DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
-
- if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak")))
- exit("Failed to rename file '" + file.getPath() + "'.", null);
-
- return file;
- }
-
- /**
- * Checks if link is correct.
- *
- * @param link Symbolic link.
- * @param correctTarget Correct link target.
- * @return {@code true} If link target is correct.
- */
- private static boolean isJarLinkCorrect(File link, File correctTarget) {
- if (!Files.isSymbolicLink(link.toPath()))
- return false; // It is a real file or it does not exist.
-
- Path target = null;
-
- try {
- target = Files.readSymbolicLink(link.toPath());
- }
- catch (IOException e) {
- exit("Failed to read symbolic link: " + link.getAbsolutePath(), e);
- }
-
- return Files.exists(target) && target.toFile().equals(correctTarget);
- }
-
- /**
- * Writes the question end read the boolean answer from the console.
- *
- * @param question Question to write.
- * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise.
- */
- private static boolean ask(String question) {
- X.println();
- X.print(" < " + question + " (Y/N): ");
-
- String answer = null;
-
- if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES")))
- answer = "Y";
- else {
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
-
- try {
- answer = br.readLine();
- }
- catch (IOException e) {
- exit("Failed to read answer: " + e.getMessage(), e);
- }
- }
-
- if (answer != null && "Y".equals(answer.toUpperCase().trim())) {
- X.println(" > Yes.");
-
- return true;
- }
- else {
- X.println(" > No.");
-
- return false;
- }
- }
-
- /**
- * Exit with message.
- *
- * @param msg Exit message.
- */
- private static void exit(String msg, Exception e) {
- X.println(" ");
- X.println(" # " + msg);
- X.println(" # Setup failed, exiting... ");
-
- if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG")))
- e.printStackTrace();
-
- System.exit(1);
- }
-
- /**
- * Prints message.
- *
- * @param msg Message.
- */
- private static void println(String msg) {
- X.println(" > " + msg);
- }
-
- /**
- * Prints warning.
- *
- * @param msg Message.
- */
- private static void warn(String msg) {
- X.println(" ! " + msg);
- }
-
- /**
- * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the
- * answer. If it's 'Y' then backups original files and corrects invalid new line characters.
- *
- * @param rootDir Root directory to process.
- * @param dirs Directories inside of the root to process.
- */
- private static void processCmdFiles(File rootDir, String... dirs) {
- boolean answer = false;
-
- for (String dir : dirs) {
- File subDir = new File(rootDir, dir);
-
- File[] cmdFiles = subDir.listFiles(new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- return name.toLowerCase().endsWith(".cmd");
- }
- });
-
- for (File file : cmdFiles) {
- String content = null;
-
- try (Scanner scanner = new Scanner(file)) {
- content = scanner.useDelimiter("\\Z").next();
- }
- catch (FileNotFoundException e) {
- exit("Failed to read file '" + file + "'.", e);
- }
-
- boolean invalid = false;
-
- for (int i = 0; i < content.length(); i++) {
- if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) {
- invalid = true;
-
- break;
- }
- }
-
- if (invalid) {
- answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?");
-
- if (!answer) {
- println("Ok. But Windows most probably will fail to execute them...");
-
- return;
- }
-
- println("Fixing newline characters in file '" + file.getAbsolutePath() + "'.");
-
- renameToBak(file);
-
- try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
- for (int i = 0; i < content.length(); i++) {
- if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r'))
- writer.write("\r");
-
- writer.write(content.charAt(i));
- }
- }
- catch (IOException e) {
- exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e);
- }
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
deleted file mode 100644
index 1dc8674..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.IgniteException;
-
-/**
- * Exception that throws when the task is cancelling.
- */
-public class HadoopTaskCancelledException extends IgniteException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param msg Exception message.
- */
- public HadoopTaskCancelledException(String msg) {
- super(msg);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
deleted file mode 100644
index 08c3cb5..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.io.PrintStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobPriority;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop utility methods.
- */
-public class HadoopUtils {
- /** Property to store timestamp of new job id request. */
- public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs";
-
- /** Property to store timestamp of response of new job id request. */
- public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs";
-
- /** Property to store timestamp of job submission. */
- public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs";
-
- /** Property to set custom writer of job statistics. */
- public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer";
-
- /** Staging constant. */
- private static final String STAGING_CONSTANT = ".staging";
-
- /** Old mapper class attribute. */
- private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
-
- /** Old reducer class attribute. */
- private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
-
- /**
- * Constructor.
- */
- private HadoopUtils() {
- // No-op.
- }
-
- /**
- * Wraps native split.
- *
- * @param id Split ID.
- * @param split Split.
- * @param hosts Hosts.
- * @throws IOException If failed.
- */
- public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
- ByteArrayOutputStream arr = new ByteArrayOutputStream();
- ObjectOutput out = new ObjectOutputStream(arr);
-
- assert split instanceof Writable;
-
- ((Writable)split).write(out);
-
- out.flush();
-
- return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
- }
-
- /**
- * Unwraps native split.
- *
- * @param o Wrapper.
- * @return Split.
- */
- public static Object unwrapSplit(HadoopSplitWrapper o) {
- try {
- Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
-
- w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes())));
-
- return w;
- }
- catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- /**
- * Convert Ignite job status to Hadoop job status.
- *
- * @param status Ignite job status.
- * @return Hadoop job status.
- */
- public static JobStatus status(HadoopJobStatus status, Configuration conf) {
- JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
-
- float setupProgress = 0;
- float mapProgress = 0;
- float reduceProgress = 0;
- float cleanupProgress = 0;
-
- JobStatus.State state = JobStatus.State.RUNNING;
-
- switch (status.jobPhase()) {
- case PHASE_SETUP:
- setupProgress = 0.42f;
-
- break;
-
- case PHASE_MAP:
- setupProgress = 1;
- mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt();
-
- break;
-
- case PHASE_REDUCE:
- setupProgress = 1;
- mapProgress = 1;
-
- if (status.totalReducerCnt() > 0)
- reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
- else
- reduceProgress = 1f;
-
- break;
-
- case PHASE_CANCELLING:
- case PHASE_COMPLETE:
- if (!status.isFailed()) {
- setupProgress = 1;
- mapProgress = 1;
- reduceProgress = 1;
- cleanupProgress = 1;
-
- state = JobStatus.State.SUCCEEDED;
- }
- else
- state = JobStatus.State.FAILED;
-
- break;
-
- default:
- assert false;
- }
-
- return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state,
- JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A");
- }
-
- /**
- * Gets staging area directory.
- *
- * @param conf Configuration.
- * @param usr User.
- * @return Staging area directory.
- */
- public static Path stagingAreaDir(Configuration conf, String usr) {
- return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
- + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
- }
-
- /**
- * Gets job file.
- *
- * @param conf Configuration.
- * @param usr User.
- * @param jobId Job ID.
- * @return Job file.
- */
- public static Path jobFile(Configuration conf, String usr, JobID jobId) {
- return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
- }
-
- /**
- * Checks the attribute in configuration is not set.
- *
- * @param attr Attribute name.
- * @param msg Message for creation of exception.
- * @throws IgniteCheckedException If attribute is set.
- */
- public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException {
- if (cfg.get(attr) != null)
- throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode.");
- }
-
- /**
- * Creates JobInfo from hadoop configuration.
- *
- * @param cfg Hadoop configuration.
- * @return Job info.
- * @throws IgniteCheckedException If failed.
- */
- public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
- JobConf jobConf = new JobConf(cfg);
-
- boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
- || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
-
- int numReduces = jobConf.getNumReduceTasks();
-
- jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null);
-
- if (jobConf.getUseNewMapper()) {
- String mode = "new map API";
-
- ensureNotSet(jobConf, "mapred.input.format.class", mode);
- ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
-
- if (numReduces != 0)
- ensureNotSet(jobConf, "mapred.partitioner.class", mode);
- else
- ensureNotSet(jobConf, "mapred.output.format.class", mode);
- }
- else {
- String mode = "map compatibility";
-
- ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
- ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
-
- if (numReduces != 0)
- ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
- else
- ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
- }
-
- if (numReduces != 0) {
- jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
-
- if (jobConf.getUseNewReducer()) {
- String mode = "new reduce API";
-
- ensureNotSet(jobConf, "mapred.output.format.class", mode);
- ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
- }
- else {
- String mode = "reduce compatibility";
-
- ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
- ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
- }
- }
-
- Map<String, String> props = new HashMap<>();
-
- for (Map.Entry<String, String> entry : jobConf)
- props.put(entry.getKey(), entry.getValue());
-
- return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
- }
-
- /**
- * Throws new {@link IgniteCheckedException} with original exception is serialized into string.
- * This is needed to transfer error outside the current class loader.
- *
- * @param e Original exception.
- * @return IgniteCheckedException New exception.
- */
- public static IgniteCheckedException transformException(Throwable e) {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
-
- e.printStackTrace(new PrintStream(os, true));
-
- return new IgniteCheckedException(os.toString());
- }
-
- /**
- * Returns work directory for job execution.
- *
- * @param locNodeId Local node ID.
- * @param jobId Job ID.
- * @return Working directory for job.
- * @throws IgniteCheckedException If Failed.
- */
- public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException {
- return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId);
- }
-
- /**
- * Returns subdirectory of job working directory for task execution.
- *
- * @param locNodeId Local node ID.
- * @param info Task info.
- * @return Working directory for task.
- * @throws IgniteCheckedException If Failed.
- */
- public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException {
- File jobLocDir = jobLocalDir(locNodeId, info.jobId());
-
- return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
- }
-
- /**
- * Creates {@link Configuration} in a correct class loader context to avoid caching
- * of inappropriate class loader in the Configuration object.
- * @return New instance of {@link Configuration}.
- */
- public static Configuration safeCreateConfiguration() {
- final ClassLoader oldLdr = setContextClassLoader(Configuration.class.getClassLoader());
-
- try {
- return new Configuration();
- }
- finally {
- restoreContextClassLoader(oldLdr);
- }
- }
-
-
-
- /**
- * Set context class loader.
- *
- * @param newLdr New class loader.
- * @return Old class loader.
- */
- @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) {
- ClassLoader oldLdr = Thread.currentThread().getContextClassLoader();
-
- if (newLdr != oldLdr)
- Thread.currentThread().setContextClassLoader(newLdr);
-
- return oldLdr;
- }
-
- /**
- * Restore context class loader.
- *
- * @param oldLdr Original class loader.
- */
- public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) {
- ClassLoader newLdr = Thread.currentThread().getContextClassLoader();
-
- if (newLdr != oldLdr)
- Thread.currentThread().setContextClassLoader(oldLdr);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
deleted file mode 100644
index 3f682d3..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Default Hadoop counter implementation.
- */
-public abstract class HadoopCounterAdapter implements HadoopCounter, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Counter group name. */
- private String grp;
-
- /** Counter name. */
- private String name;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- protected HadoopCounterAdapter() {
- // No-op.
- }
-
- /**
- * Creates new counter with given group and name.
- *
- * @param grp Counter group name.
- * @param name Counter name.
- */
- protected HadoopCounterAdapter(String grp, String name) {
- assert grp != null : "counter must have group";
- assert name != null : "counter must have name";
-
- this.grp = grp;
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public String group() {
- return grp;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(grp);
- out.writeUTF(name);
- writeValue(out);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- grp = in.readUTF();
- name = in.readUTF();
- readValue(in);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- HadoopCounterAdapter cntr = (HadoopCounterAdapter)o;
-
- if (!grp.equals(cntr.grp))
- return false;
- if (!name.equals(cntr.name))
- return false;
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = grp.hashCode();
- res = 31 * res + name.hashCode();
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopCounterAdapter.class, this);
- }
-
- /**
- * Writes value of this counter to output.
- *
- * @param out Output.
- * @throws IOException If failed.
- */
- protected abstract void writeValue(ObjectOutput out) throws IOException;
-
- /**
- * Read value of this counter from input.
- *
- * @param in Input.
- * @throws IOException If failed.
- */
- protected abstract void readValue(ObjectInput in) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
deleted file mode 100644
index f3b5463..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.Constructor;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.util.lang.GridTuple3;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jsr166.ConcurrentHashMap8;
-
-/**
- * Default in-memory counters store.
- */
-public class HadoopCountersImpl implements HadoopCounters, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final ConcurrentMap<CounterKey, HadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
-
- /**
- * Default constructor. Creates new instance without counters.
- */
- public HadoopCountersImpl() {
- // No-op.
- }
-
- /**
- * Creates new instance that contain given counters.
- *
- * @param cntrs Counters to store.
- */
- public HadoopCountersImpl(Iterable<HadoopCounter> cntrs) {
- addCounters(cntrs, true);
- }
-
- /**
- * Copy constructor.
- *
- * @param cntrs Counters to copy.
- */
- public HadoopCountersImpl(HadoopCounters cntrs) {
- this(cntrs.all());
- }
-
- /**
- * Creates counter instance.
- *
- * @param cls Class of the counter.
- * @param grp Group name.
- * @param name Counter name.
- * @return Counter.
- */
- private <T extends HadoopCounter> T createCounter(Class<? extends HadoopCounter> cls, String grp,
- String name) {
- try {
- Constructor constructor = cls.getConstructor(String.class, String.class);
-
- return (T)constructor.newInstance(grp, name);
- }
- catch (Exception e) {
- throw new IgniteException(e);
- }
- }
-
- /**
- * Adds counters collection in addition to existing counters.
- *
- * @param cntrs Counters to add.
- * @param cp Whether to copy counters or not.
- */
- private void addCounters(Iterable<HadoopCounter> cntrs, boolean cp) {
- assert cntrs != null;
-
- for (HadoopCounter cntr : cntrs) {
- if (cp) {
- HadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
-
- cntrCp.merge(cntr);
-
- cntr = cntrCp;
- }
-
- cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
- assert cls != null;
-
- CounterKey mapKey = new CounterKey(cls, grp, name);
-
- T cntr = (T)cntrsMap.get(mapKey);
-
- if (cntr == null) {
- cntr = createCounter(cls, grp, name);
-
- T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
-
- if (old != null)
- return old;
- }
-
- return cntr;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<HadoopCounter> all() {
- return cntrsMap.values();
- }
-
- /** {@inheritDoc} */
- @Override public void merge(HadoopCounters other) {
- for (HadoopCounter counter : other.all())
- counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeCollection(out, cntrsMap.values());
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- addCounters(U.<HadoopCounter>readCollection(in), false);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- HadoopCountersImpl counters = (HadoopCountersImpl)o;
-
- return cntrsMap.equals(counters.cntrsMap);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return cntrsMap.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values());
- }
-
- /**
- * The tuple of counter identifier components for more readable code.
- */
- private static class CounterKey extends GridTuple3<Class<? extends HadoopCounter>, String, String> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Constructor.
- *
- * @param cls Class of the counter.
- * @param grp Group name.
- * @param name Counter name.
- */
- private CounterKey(Class<? extends HadoopCounter> cls, String grp, String name) {
- super(cls, grp, name);
- }
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public CounterKey() {
- // No-op.
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
deleted file mode 100644
index 0d61e0d..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-/**
- * Standard hadoop counter to use via original Hadoop API in Hadoop jobs.
- */
-public class HadoopLongCounter extends HadoopCounterAdapter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** The counter value. */
- private long val;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public HadoopLongCounter() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param grp Group name.
- * @param name Counter name.
- */
- public HadoopLongCounter(String grp, String name) {
- super(grp, name);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeValue(ObjectOutput out) throws IOException {
- out.writeLong(val);
- }
-
- /** {@inheritDoc} */
- @Override protected void readValue(ObjectInput in) throws IOException {
- val = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public void merge(HadoopCounter cntr) {
- val += ((HadoopLongCounter)cntr).val;
- }
-
- /**
- * Gets current value of this counter.
- *
- * @return Current value.
- */
- public long value() {
- return val;
- }
-
- /**
- * Sets current value by the given value.
- *
- * @param val Value to set.
- */
- public void value(long val) {
- this.val = val;
- }
-
- /**
- * Increment this counter by the given value.
- *
- * @param i Value to increase this counter by.
- */
- public void increment(long i) {
- val += i;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
deleted file mode 100644
index dedc6b3..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.JOB_SUBMISSION_START_TS_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.REQ_NEW_JOBID_TS_PROPERTY;
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.RESPONSE_NEW_JOBID_TS_PROPERTY;
-
-/**
- * Counter for the job statistics accumulation.
- */
-public class HadoopPerformanceCounter extends HadoopCounterAdapter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** The group name for this counter. */
- private static final String GROUP_NAME = "SYSTEM";
-
- /** The counter name for this counter. */
- private static final String COUNTER_NAME = "PERFORMANCE";
-
- /** Events collections. */
- private Collection<T2<String,Long>> evts = new ArrayList<>();
-
- /** Node id to insert into the event info. */
- private UUID nodeId;
-
- /** */
- private int reducerNum;
-
- /** */
- private volatile Long firstShuffleMsg;
-
- /** */
- private volatile Long lastShuffleMsg;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public HadoopPerformanceCounter() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param grp Group name.
- * @param name Counter name.
- */
- public HadoopPerformanceCounter(String grp, String name) {
- super(grp, name);
- }
-
- /**
- * Constructor to create instance to use this as helper.
- *
- * @param nodeId Id of the work node.
- */
- public HadoopPerformanceCounter(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /** {@inheritDoc} */
- @Override protected void writeValue(ObjectOutput out) throws IOException {
- U.writeCollection(out, evts);
- }
-
- /** {@inheritDoc} */
- @Override protected void readValue(ObjectInput in) throws IOException {
- try {
- evts = U.readCollection(in);
- }
- catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void merge(HadoopCounter cntr) {
- evts.addAll(((HadoopPerformanceCounter)cntr).evts);
- }
-
- /**
- * Gets the events collection.
- *
- * @return Collection of event.
- */
- public Collection<T2<String, Long>> evts() {
- return evts;
- }
-
- /**
- * Generate name that consists of some event information.
- *
- * @param info Task info.
- * @param evtType The type of the event.
- * @return String contains necessary event information.
- */
- private String eventName(HadoopTaskInfo info, String evtType) {
- return eventName(info.type().toString(), info.taskNumber(), evtType);
- }
-
- /**
- * Generate name that consists of some event information.
- *
- * @param taskType Task type.
- * @param taskNum Number of the task.
- * @param evtType The type of the event.
- * @return String contains necessary event information.
- */
- private String eventName(String taskType, int taskNum, String evtType) {
- assert nodeId != null;
-
- return taskType + " " + taskNum + " " + evtType + " " + nodeId;
- }
-
- /**
- * Adds event of the task submission (task instance creation).
- *
- * @param info Task info.
- * @param ts Timestamp of the event.
- */
- public void onTaskSubmit(HadoopTaskInfo info, long ts) {
- evts.add(new T2<>(eventName(info, "submit"), ts));
- }
-
- /**
- * Adds event of the task preparation.
- *
- * @param info Task info.
- * @param ts Timestamp of the event.
- */
- public void onTaskPrepare(HadoopTaskInfo info, long ts) {
- evts.add(new T2<>(eventName(info, "prepare"), ts));
- }
-
- /**
- * Adds event of the task finish.
- *
- * @param info Task info.
- * @param ts Timestamp of the event.
- */
- public void onTaskFinish(HadoopTaskInfo info, long ts) {
- if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) {
- evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
- evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
-
- lastShuffleMsg = null;
- }
-
- evts.add(new T2<>(eventName(info, "finish"), ts));
- }
-
- /**
- * Adds event of the task run.
- *
- * @param info Task info.
- * @param ts Timestamp of the event.
- */
- public void onTaskStart(HadoopTaskInfo info, long ts) {
- evts.add(new T2<>(eventName(info, "start"), ts));
- }
-
- /**
- * Adds event of the job preparation.
- *
- * @param ts Timestamp of the event.
- */
- public void onJobPrepare(long ts) {
- assert nodeId != null;
-
- evts.add(new T2<>("JOB prepare " + nodeId, ts));
- }
-
- /**
- * Adds event of the job start.
- *
- * @param ts Timestamp of the event.
- */
- public void onJobStart(long ts) {
- assert nodeId != null;
-
- evts.add(new T2<>("JOB start " + nodeId, ts));
- }
-
- /**
- * Adds client submission events from job info.
- *
- * @param info Job info.
- */
- public void clientSubmissionEvents(HadoopJobInfo info) {
- assert nodeId != null;
-
- addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
- addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
- addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
- }
-
- /**
- * Adds event with timestamp from some property in job info.
- *
- * @param evt Event type and phase.
- * @param info Job info.
- * @param propName Property name to get timestamp.
- */
- private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) {
- String val = info.property(propName);
-
- if (!F.isEmpty(val)) {
- try {
- evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
- }
- catch (NumberFormatException e) {
- throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
- }
- }
- }
-
- /**
- * Registers shuffle message event.
- *
- * @param reducerNum Number of reducer that receives the data.
- * @param ts Timestamp of the event.
- */
- public void onShuffleMessage(int reducerNum, long ts) {
- this.reducerNum = reducerNum;
-
- if (firstShuffleMsg == null)
- firstShuffleMsg = ts;
-
- lastShuffleMsg = ts;
- }
-
- /**
- * Gets system predefined performance counter from the HadoopCounters object.
- *
- * @param cntrs HadoopCounters object.
- * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
- * @return Predefined performance counter.
- */
- public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) {
- HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
-
- if (nodeId != null)
- cntr.nodeId(nodeId);
-
- return cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
- }
-
- /**
- * Sets the nodeId field.
- *
- * @param nodeId Node id.
- */
- private void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java
deleted file mode 100644
index 82769be..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopBasicFileSystemFactoryDelegate.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.delegate;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.BasicHadoopFileSystemFactory;
-import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
-import org.apache.ignite.hadoop.util.UserNameMapper;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.common.delegate.HadoopFileSystemFactoryDelegate;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lifecycle.LifecycleAware;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Arrays;
-
-/**
- * Basic Hadoop file system factory delegate.
- */
-public class HadoopBasicFileSystemFactoryDelegate implements HadoopFileSystemFactoryDelegate {
- /** Proxy. */
- protected final HadoopFileSystemFactory proxy;
-
- /** Configuration of the secondary filesystem, never null. */
- protected Configuration cfg;
-
- /** Resulting URI. */
- protected URI fullUri;
-
- /** User name mapper. */
- private UserNameMapper usrNameMapper;
-
- /**
- * Constructor.
- *
- * @param proxy Proxy.
- */
- public HadoopBasicFileSystemFactoryDelegate(BasicHadoopFileSystemFactory proxy) {
- this.proxy = proxy;
- }
-
- /** {@inheritDoc} */
- @Override public FileSystem get(String name) throws IOException {
- String name0 = IgfsUtils.fixUserName(name);
-
- if (usrNameMapper != null)
- name0 = IgfsUtils.fixUserName(usrNameMapper.map(name0));
-
- return getWithMappedName(name0);
- }
-
- /**
- * Internal file system create routine.
- *
- * @param usrName User name.
- * @return File system.
- * @throws IOException If failed.
- */
- protected FileSystem getWithMappedName(String usrName) throws IOException {
- assert cfg != null;
-
- try {
- // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
- // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
- // classloader to classloader of current class to avoid strange class-cast-exceptions.
- ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
- try {
- return create(usrName);
- }
- finally {
- HadoopUtils.restoreContextClassLoader(oldLdr);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IOException("Failed to create file system due to interrupt.", e);
- }
- }
-
- /**
- * Internal file system creation routine, invoked in correct class loader context.
- *
- * @param usrName User name.
- * @return File system.
- * @throws IOException If failed.
- * @throws InterruptedException if the current thread is interrupted.
- */
- protected FileSystem create(String usrName) throws IOException, InterruptedException {
- return FileSystem.get(fullUri, cfg, usrName);
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- BasicHadoopFileSystemFactory proxy0 = (BasicHadoopFileSystemFactory)proxy;
-
- cfg = HadoopUtils.safeCreateConfiguration();
-
- if (proxy0.getConfigPaths() != null) {
- for (String cfgPath : proxy0.getConfigPaths()) {
- if (cfgPath == null)
- throw new NullPointerException("Configuration path cannot be null: " +
- Arrays.toString(proxy0.getConfigPaths()));
- else {
- URL url = U.resolveIgniteUrl(cfgPath);
-
- if (url == null) {
- // If secConfPath is given, it should be resolvable:
- throw new IgniteException("Failed to resolve secondary file system configuration path " +
- "(ensure that it exists locally and you have read access to it): " + cfgPath);
- }
-
- cfg.addResource(url);
- }
- }
- }
-
- // If secondary fs URI is not given explicitly, try to get it from the configuration:
- if (proxy0.getUri() == null)
- fullUri = FileSystem.getDefaultUri(cfg);
- else {
- try {
- fullUri = new URI(proxy0.getUri());
- }
- catch (URISyntaxException use) {
- throw new IgniteException("Failed to resolve secondary file system URI: " + proxy0.getUri());
- }
- }
-
- usrNameMapper = proxy0.getUserNameMapper();
-
- if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
- ((LifecycleAware)usrNameMapper).start();
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- if (usrNameMapper != null && usrNameMapper instanceof LifecycleAware)
- ((LifecycleAware)usrNameMapper).stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java
deleted file mode 100644
index 04bbeb8..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopCachingFileSystemFactoryDelegate.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.delegate;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-
-import java.io.IOException;
-
-/**
- * Caching Hadoop file system factory delegate.
- */
-public class HadoopCachingFileSystemFactoryDelegate extends HadoopBasicFileSystemFactoryDelegate {
- /** Per-user file system cache. */
- private final HadoopLazyConcurrentMap<String, FileSystem> cache = new HadoopLazyConcurrentMap<>(
- new HadoopLazyConcurrentMap.ValueFactory<String, FileSystem>() {
- @Override public FileSystem createValue(String key) throws IOException {
- return HadoopCachingFileSystemFactoryDelegate.super.getWithMappedName(key);
- }
- }
- );
-
- /**
- * Constructor.
- *
- * @param proxy Proxy.
- */
- public HadoopCachingFileSystemFactoryDelegate(CachingHadoopFileSystemFactory proxy) {
- super(proxy);
- }
-
- /** {@inheritDoc} */
- @Override public FileSystem getWithMappedName(String name) throws IOException {
- return cache.getOrCreate(name);
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteException {
- super.start();
-
- // Disable caching.
- cfg.setBoolean(HadoopFileSystemsUtils.disableFsCachePropertyName(fullUri.getScheme()), true);
- }
-
- /** {@inheritDoc} */
- @Override public void stop() throws IgniteException {
- super.stop();
-
- try {
- cache.close();
- }
- catch (IgniteCheckedException ice) {
- throw new IgniteException(ice);
- }
- }
-}