You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:54:22 UTC
[90/92] [abbrv] ignite git commit: Moved another big part of logic
from "hadoop-impl" to "hadoop".
Moved another big part of logic from "hadoop-impl" to "hadoop".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53237dd3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53237dd3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53237dd3
Branch: refs/heads/ignite-3949
Commit: 53237dd3ee2957f95ca3f9674a6488bc9d02b85b
Parents: 8607846
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 17:29:44 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 17:29:44 2016 +0300
----------------------------------------------------------------------
bin/setup-hadoop.bat | 2 +-
bin/setup-hadoop.sh | 2 +-
.../ignite/internal/IgniteComponentType.java | 2 +-
.../hadoop/impl/HadoopAttributes.java | 168 ------
.../processors/hadoop/impl/HadoopImpl.java | 138 -----
.../processors/hadoop/impl/HadoopProcessor.java | 242 ---------
.../processors/hadoop/impl/HadoopSetup.java | 542 -------------------
.../processors/hadoop/HadoopAttributes.java | 168 ++++++
.../internal/processors/hadoop/HadoopImpl.java | 134 +++++
.../processors/hadoop/HadoopProcessor.java | 230 ++++++++
.../internal/processors/hadoop/HadoopSetup.java | 542 +++++++++++++++++++
11 files changed, 1077 insertions(+), 1093 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/bin/setup-hadoop.bat
----------------------------------------------------------------------
diff --git a/bin/setup-hadoop.bat b/bin/setup-hadoop.bat
index 198a1b0..a11ef8c 100644
--- a/bin/setup-hadoop.bat
+++ b/bin/setup-hadoop.bat
@@ -23,6 +23,6 @@
if "%OS%" == "Windows_NT" setlocal
-set MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.impl.HadoopSetup
+set MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.HadoopSetup
call "%~dp0\ignite.bat" %*
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/bin/setup-hadoop.sh
----------------------------------------------------------------------
diff --git a/bin/setup-hadoop.sh b/bin/setup-hadoop.sh
index 3188e4d..8870c75 100755
--- a/bin/setup-hadoop.sh
+++ b/bin/setup-hadoop.sh
@@ -55,7 +55,7 @@ setIgniteHome
#
# Set utility environment.
#
-export MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.impl.HadoopSetup
+export MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.HadoopSetup
#
# Start utility.
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
index 0aabdf4..fa5240e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
@@ -37,7 +37,7 @@ public enum IgniteComponentType {
/** Hadoop. */
HADOOP(
"org.apache.ignite.internal.processors.hadoop.HadoopNoopProcessor",
- "org.apache.ignite.internal.processors.hadoop.impl.HadoopProcessor",
+ "org.apache.ignite.internal.processors.hadoop.HadoopProcessor",
"ignite-hadoop-impl"
),
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAttributes.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAttributes.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAttributes.java
deleted file mode 100644
index 23eaa18..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAttributes.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Arrays;
-
-/**
- * Hadoop attributes.
- */
-public class HadoopAttributes implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Attribute name. */
- public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop";
-
- /** Map-reduce planner class name. */
- private String plannerCls;
-
- /** External executor flag. */
- private boolean extExec;
-
- /** Maximum parallel tasks. */
- private int maxParallelTasks;
-
- /** Maximum task queue size. */
- private int maxTaskQueueSize;
-
- /** Library names. */
- @GridToStringExclude
- private String[] libNames;
-
- /** Number of cores. */
- private int cores;
-
- /**
- * Get attributes for node (if any).
- *
- * @param node Node.
- * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node.
- */
- @Nullable public static HadoopAttributes forNode(ClusterNode node) {
- return node.attribute(NAME);
- }
-
- /**
- * {@link Externalizable} support.
- */
- public HadoopAttributes() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param cfg Configuration.
- */
- public HadoopAttributes(HadoopConfiguration cfg) {
- assert cfg != null;
- assert cfg.getMapReducePlanner() != null;
-
- plannerCls = cfg.getMapReducePlanner().getClass().getName();
-
- // TODO: IGNITE-404: Get from configuration when fixed.
- extExec = false;
-
- maxParallelTasks = cfg.getMaxParallelTasks();
- maxTaskQueueSize = cfg.getMaxTaskQueueSize();
- libNames = cfg.getNativeLibraryNames();
-
- // Cores count already passed in other attributes, we add it here for convenience.
- cores = Runtime.getRuntime().availableProcessors();
- }
-
- /**
- * @return Map reduce planner class name.
- */
- public String plannerClassName() {
- return plannerCls;
- }
-
- /**
- * @return External execution flag.
- */
- public boolean externalExecution() {
- return extExec;
- }
-
- /**
- * @return Maximum parallel tasks.
- */
- public int maxParallelTasks() {
- return maxParallelTasks;
- }
-
- /**
- * @return Maximum task queue size.
- */
- public int maxTaskQueueSize() {
- return maxTaskQueueSize;
- }
-
-
- /**
- * @return Native library names.
- */
- public String[] nativeLibraryNames() {
- return libNames;
- }
-
- /**
- * @return Number of cores on machine.
- */
- public int cores() {
- return cores;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(plannerCls);
- out.writeBoolean(extExec);
- out.writeInt(maxParallelTasks);
- out.writeInt(maxTaskQueueSize);
- out.writeObject(libNames);
- out.writeInt(cores);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- plannerCls = (String)in.readObject();
- extExec = in.readBoolean();
- maxParallelTasks = in.readInt();
- maxTaskQueueSize = in.readInt();
- libNames = (String[])in.readObject();
- cores = in.readInt();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames));
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java
deleted file mode 100644
index 80309df..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopImpl.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop facade implementation.
- */
-public class HadoopImpl implements Hadoop {
- /** Hadoop processor. */
- private final HadoopProcessor proc;
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /**
- * Constructor.
- *
- * @param proc Hadoop processor.
- */
- HadoopImpl(HadoopProcessor proc) {
- this.proc = proc;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration configuration() {
- return proc.config();
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobId nextJobId() {
- if (busyLock.enterBusy()) {
- try {
- return proc.nextJobId();
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get next job ID (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
- if (busyLock.enterBusy()) {
- try {
- return proc.submit(jobId, jobInfo);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to submit job (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.status(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job status (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.counters(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job counters (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.finishFuture(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get job finish future (grid is stopping).");
- }
-
- /** {@inheritDoc} */
- @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- return proc.kill(jobId);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to kill job (grid is stopping).");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java
deleted file mode 100644
index a77e918..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopProcessor.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopAttributes;
-import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
-import org.apache.ignite.internal.processors.hadoop.HadoopClasspathUtils;
-import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
-import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
-import org.apache.ignite.internal.processors.hadoop.HadoopContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-import org.apache.ignite.internal.processors.hadoop.HadoopLocations;
-import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Hadoop processor.
- */
-public class HadoopProcessor extends HadoopProcessorAdapter {
- /** Job ID counter. */
- private final AtomicInteger idCtr = new AtomicInteger();
-
- /** Hadoop context. */
- @GridToStringExclude
- private HadoopContext hctx;
-
- /** Hadoop facade for public API. */
- @GridToStringExclude
- private Hadoop hadoop;
-
- /**
- * Constructor.
- *
- * @param ctx Kernal context.
- */
- public HadoopProcessor(GridKernalContext ctx) {
- super(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- if (ctx.isDaemon())
- return;
-
- HadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
-
- if (cfg == null)
- cfg = new HadoopConfiguration();
- else
- cfg = new HadoopConfiguration(cfg);
-
- initializeDefaults(cfg);
-
- hctx = new HadoopContext(
- ctx,
- cfg,
- new HadoopJobTracker(),
- new HadoopEmbeddedTaskExecutor(),
- // TODO: IGNITE-404: Uncomment when fixed.
- //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
- new HadoopShuffle());
-
- for (HadoopComponent c : hctx.components())
- c.start(hctx);
-
- hadoop = new HadoopImpl(this);
-
- ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg));
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- if (hctx == null)
- return;
-
- for (HadoopComponent c : hctx.components())
- c.onKernalStart();
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- super.onKernalStop(cancel);
-
- if (hctx == null)
- return;
-
- List<HadoopComponent> components = hctx.components();
-
- for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
- HadoopComponent c = it.previous();
-
- c.onKernalStop(cancel);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- super.stop(cancel);
-
- if (hctx == null)
- return;
-
- List<HadoopComponent> components = hctx.components();
-
- for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
- HadoopComponent c = it.previous();
-
- c.stop(cancel);
- }
- }
-
- /**
- * Gets Hadoop context.
- *
- * @return Hadoop context.
- */
- public HadoopContext context() {
- return hctx;
- }
-
- /** {@inheritDoc} */
- @Override public Hadoop hadoop() {
- if (hadoop == null)
- throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
- "is HADOOP_HOME environment variable set?)");
-
- return hadoop;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopConfiguration config() {
- return hctx.configuration();
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobId nextJobId() {
- return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
- ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
-
- try {
- return hctx.jobTracker().submit(jobId, jobInfo);
- }
- finally {
- HadoopCommonUtils.restoreContextClassLoader(oldLdr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().status(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().jobCounters(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().finishFuture(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
- return hctx.jobTracker().killJob(jobId);
- }
-
- /** {@inheritDoc} */
- @Override public void validateEnvironment() throws IgniteCheckedException {
- // Perform some static checks as early as possible, so that any recoverable exceptions are thrown here.
- try {
- HadoopLocations loc = HadoopClasspathUtils.locations();
-
- if (!F.isEmpty(loc.home()))
- U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " + loc.home());
-
- U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + loc.common() + ", " + loc.hdfs() + ", " +
- loc.mapred());
- }
- catch (IOException ioe) {
- throw new IgniteCheckedException(ioe.getMessage(), ioe);
- }
-
- HadoopClassLoader.hadoopUrls();
- }
-
- /**
- * Initializes default hadoop configuration.
- *
- * @param cfg Hadoop configuration.
- */
- private void initializeDefaults(HadoopConfiguration cfg) {
- if (cfg.getMapReducePlanner() == null)
- cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopProcessor.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java
deleted file mode 100644
index f62c999..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSetup.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Scanner;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
-import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
-
-/**
- * Setup tool to configure Hadoop client.
- */
-public class HadoopSetup {
- /** */
- public static final String WINUTILS_EXE = "winutils.exe";
-
- /** */
- private static final FilenameFilter IGNITE_JARS = new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- return name.startsWith("ignite-") && name.endsWith(".jar");
- }
- };
-
- /**
- * The main method.
- * @param ignore Params.
- */
- public static void main(String[] ignore) {
- X.println(
- " __________ ________________ ",
- " / _/ ___/ |/ / _/_ __/ __/ ",
- " _/ // (7 7 // / / / / _/ ",
- "/___/\\___/_/|_/___/ /_/ /___/ ",
- " for Apache Hadoop ",
- " ",
- "ver. " + ACK_VER_STR,
- COPYRIGHT);
-
- configureHadoop();
- }
-
- /**
- * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop.
- * It performs these operations:
- * <ul>
- * <li>Check for setting of HADOOP_HOME environment variable.</li>
- * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li>
- * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li>
- * <li>In Windows check new line character issues in CMD scripts.</li>
- * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li>
- * </ul>
- */
- private static void configureHadoop() {
- String igniteHome = U.getIgniteHome();
-
- println("IGNITE_HOME is set to '" + igniteHome + "'.");
-
- checkIgniteHome(igniteHome);
-
- String homeVar = "HADOOP_HOME";
- String hadoopHome = System.getenv(homeVar);
-
- if (F.isEmpty(hadoopHome)) {
- homeVar = "HADOOP_PREFIX";
- hadoopHome = System.getenv(homeVar);
- }
-
- if (F.isEmpty(hadoopHome))
- exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " +
- "valid Hadoop installation directory and run setup tool again.", null);
-
- hadoopHome = hadoopHome.replaceAll("\"", "");
-
- println(homeVar + " is set to '" + hadoopHome + "'.");
-
- String hiveHome = System.getenv("HIVE_HOME");
-
- if (!F.isEmpty(hiveHome)) {
- hiveHome = hiveHome.replaceAll("\"", "");
-
- println("HIVE_HOME is set to '" + hiveHome + "'.");
- }
-
- File hadoopDir = new File(hadoopHome);
-
- if (!hadoopDir.exists())
- exit("Hadoop installation folder does not exist.", null);
-
- if (!hadoopDir.isDirectory())
- exit("HADOOP_HOME must point to a directory.", null);
-
- if (!hadoopDir.canRead())
- exit("Hadoop installation folder can not be read. Please check permissions.", null);
-
- final File hadoopCommonDir;
-
- String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME");
-
- if (F.isEmpty(hadoopCommonHome)) {
- hadoopCommonDir = new File(hadoopDir, "share/hadoop/common");
-
- println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'.");
- }
- else {
- println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'.");
-
- hadoopCommonDir = new File(hadoopCommonHome);
- }
-
- if (!hadoopCommonDir.canRead())
- exit("Failed to read Hadoop common dir '" + hadoopCommonDir + "'.", null);
-
- final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib");
-
- if (!hadoopCommonLibDir.canRead())
- exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null);
-
- if (U.isWindows()) {
- checkJavaPathSpaces();
-
- final File hadoopBinDir = new File(hadoopDir, "bin");
-
- if (!hadoopBinDir.canRead())
- exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null);
-
- File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE);
-
- if (!winutilsFile.exists()) {
- if (ask("File '" + WINUTILS_EXE + "' does not exist. " +
- "It may be replaced by a stub. Create it?")) {
- println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'.");
-
- boolean ok = false;
-
- try {
- ok = winutilsFile.createNewFile();
- }
- catch (IOException ignore) {
- // No-op.
- }
-
- if (!ok)
- exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null);
- }
- else
- println("Ok. But Hadoop client probably will not work on Windows this way...");
- }
-
- processCmdFiles(hadoopDir, "bin", "sbin", "libexec");
- }
-
- File igniteLibs = new File(new File(igniteHome), "libs");
-
- if (!igniteLibs.exists())
- exit("Ignite 'libs' folder is not found.", null);
-
- Collection<File> jarFiles = new ArrayList<>();
-
- addJarsInFolder(jarFiles, igniteLibs);
- addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop"));
- addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl"));
-
- boolean jarsLinksCorrect = true;
-
- for (File file : jarFiles) {
- File link = new File(hadoopCommonLibDir, file.getName());
-
- jarsLinksCorrect &= isJarLinkCorrect(link, file);
-
- if (!jarsLinksCorrect)
- break;
- }
-
- if (!jarsLinksCorrect) {
- if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " +
- "Create appropriate symbolic links?")) {
- File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS);
-
- if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " +
- "installation. They must be deleted to continue. Continue?")) {
- for (File file : oldIgniteJarFiles) {
- println("Deleting file '" + file.getAbsolutePath() + "'.");
-
- if (!file.delete())
- exit("Failed to delete file '" + file.getPath() + "'.", null);
- }
- }
-
- for (File file : jarFiles) {
- File targetFile = new File(hadoopCommonLibDir, file.getName());
-
- try {
- println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'.");
-
- Files.createSymbolicLink(targetFile.toPath(), file.toPath());
- }
- catch (IOException e) {
- if (U.isWindows()) {
- warn("Ability to create symbolic links is required!");
- warn("On Windows platform you have to grant permission 'Create symbolic links'");
- warn("to your user or run the Accelerator as Administrator.");
- }
-
- exit("Creating symbolic link failed! Check permissions.", e);
- }
- }
- }
- else
- println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath...");
- }
-
- File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop");
-
- File igniteHadoopCfg = igniteHadoopConfig(igniteHome);
-
- if (!igniteHadoopCfg.canRead())
- exit("Failed to read Ignite Hadoop 'config' folder at '" + igniteHadoopCfg.getAbsolutePath() + "'.", null);
-
- if (hadoopEtc.canWrite()) { // TODO Bigtop
- if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " +
- "(existing files will be backed up)?")) {
- replaceWithBackup(new File(igniteHadoopCfg, "core-site.ignite.xml"),
- new File(hadoopEtc, "core-site.xml"));
-
- replaceWithBackup(new File(igniteHadoopCfg, "mapred-site.ignite.xml"),
- new File(hadoopEtc, "mapred-site.xml"));
- }
- else
- println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory...");
- }
-
- if (!F.isEmpty(hiveHome)) {
- File hiveConfDir = new File(hiveHome + File.separator + "conf");
-
- if (!hiveConfDir.canWrite())
- warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " +
- "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory.");
- else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?"))
- replaceWithBackup(new File(igniteHadoopCfg, "hive-site.ignite.xml"),
- new File(hiveConfDir, "hive-site.xml"));
- else
- println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory...");
- }
-
- println("Apache Hadoop setup is complete.");
- }
-
- /**
- * Get Ignite Hadoop config directory.
- *
- * @param igniteHome Ignite home.
- * @return Ignite Hadoop config directory.
- */
- private static File igniteHadoopConfig(String igniteHome) {
- Path path = Paths.get(igniteHome, "modules", "hadoop", "config");
-
- if (!Files.exists(path))
- path = Paths.get(igniteHome, "config", "hadoop");
-
- if (Files.exists(path))
- return path.toFile();
- else
- return new File(igniteHome, "docs");
- }
-
- /**
- * @param jarFiles Jars.
- * @param folder Folder.
- */
- private static void addJarsInFolder(Collection<File> jarFiles, File folder) {
- if (!folder.exists())
- exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null);
-
- jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS)));
- }
-
- /**
- * Checks that JAVA_HOME does not contain space characters.
- */
- private static void checkJavaPathSpaces() {
- String javaHome = System.getProperty("java.home");
-
- if (javaHome.contains(" ")) {
- warn("Java installation path contains space characters!");
- warn("Hadoop client will not be able to start using '" + javaHome + "'.");
- warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation.");
- }
- }
-
- /**
- * Checks Ignite home.
- *
- * @param igniteHome Ignite home.
- */
- private static void checkIgniteHome(String igniteHome) {
- URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation();
-
- try {
- Path jar = Paths.get(jarUrl.toURI());
- Path igHome = Paths.get(igniteHome);
-
- if (!jar.startsWith(igHome))
- exit("Ignite JAR files are not under IGNITE_HOME.", null);
- }
- catch (Exception e) {
- exit(e.getMessage(), e);
- }
- }
-
- /**
- * Replaces target file with source file.
- *
- * @param from From.
- * @param to To.
- */
- private static void replaceWithBackup(File from, File to) {
- if (!from.canRead())
- exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null);
-
- println("Replacing file '" + to.getAbsolutePath() + "'.");
-
- try {
- U.copy(from, renameToBak(to), true);
- }
- catch (IOException e) {
- exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e);
- }
- }
-
- /**
- * Renames file for backup.
- *
- * @param file File.
- * @return File.
- */
- private static File renameToBak(File file) {
- DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
-
- if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak")))
- exit("Failed to rename file '" + file.getPath() + "'.", null);
-
- return file;
- }
-
- /**
- * Checks if link is correct.
- *
- * @param link Symbolic link.
- * @param correctTarget Correct link target.
- * @return {@code true} If link target is correct.
- */
- private static boolean isJarLinkCorrect(File link, File correctTarget) {
- if (!Files.isSymbolicLink(link.toPath()))
- return false; // It is a real file or it does not exist.
-
- Path target = null;
-
- try {
- target = Files.readSymbolicLink(link.toPath());
- }
- catch (IOException e) {
- exit("Failed to read symbolic link: " + link.getAbsolutePath(), e);
- }
-
- return Files.exists(target) && target.toFile().equals(correctTarget);
- }
-
- /**
- * Writes the question end read the boolean answer from the console.
- *
- * @param question Question to write.
- * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise.
- */
- private static boolean ask(String question) {
- X.println();
- X.print(" < " + question + " (Y/N): ");
-
- String answer = null;
-
- if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES")))
- answer = "Y";
- else {
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
-
- try {
- answer = br.readLine();
- }
- catch (IOException e) {
- exit("Failed to read answer: " + e.getMessage(), e);
- }
- }
-
- if (answer != null && "Y".equals(answer.toUpperCase().trim())) {
- X.println(" > Yes.");
-
- return true;
- }
- else {
- X.println(" > No.");
-
- return false;
- }
- }
-
- /**
- * Exit with message.
- *
- * @param msg Exit message.
- */
- private static void exit(String msg, Exception e) {
- X.println(" ");
- X.println(" # " + msg);
- X.println(" # Setup failed, exiting... ");
-
- if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG")))
- e.printStackTrace();
-
- System.exit(1);
- }
-
- /**
- * Prints message.
- *
- * @param msg Message.
- */
- private static void println(String msg) {
- X.println(" > " + msg);
- }
-
- /**
- * Prints warning.
- *
- * @param msg Message.
- */
- private static void warn(String msg) {
- X.println(" ! " + msg);
- }
-
- /**
- * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the
- * answer. If it's 'Y' then backups original files and corrects invalid new line characters.
- *
- * @param rootDir Root directory to process.
- * @param dirs Directories inside of the root to process.
- */
- private static void processCmdFiles(File rootDir, String... dirs) {
- boolean answer = false;
-
- for (String dir : dirs) {
- File subDir = new File(rootDir, dir);
-
- File[] cmdFiles = subDir.listFiles(new FilenameFilter() {
- @Override public boolean accept(File dir, String name) {
- return name.toLowerCase().endsWith(".cmd");
- }
- });
-
- for (File file : cmdFiles) {
- String content = null;
-
- try (Scanner scanner = new Scanner(file)) {
- content = scanner.useDelimiter("\\Z").next();
- }
- catch (FileNotFoundException e) {
- exit("Failed to read file '" + file + "'.", e);
- }
-
- boolean invalid = false;
-
- for (int i = 0; i < content.length(); i++) {
- if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) {
- invalid = true;
-
- break;
- }
- }
-
- if (invalid) {
- answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?");
-
- if (!answer) {
- println("Ok. But Windows most probably will fail to execute them...");
-
- return;
- }
-
- println("Fixing newline characters in file '" + file.getAbsolutePath() + "'.");
-
- renameToBak(file);
-
- try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
- for (int i = 0; i < content.length(); i++) {
- if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r'))
- writer.write("\r");
-
- writer.write(content.charAt(i));
- }
- }
- catch (IOException e) {
- exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e);
- }
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
new file mode 100644
index 0000000..23eaa18
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+
+/**
+ * Hadoop attributes.
+ */
+public class HadoopAttributes implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Attribute name. */
+ public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop";
+
+ /** Map-reduce planner class name. */
+ private String plannerCls;
+
+ /** External executor flag. */
+ private boolean extExec;
+
+ /** Maximum parallel tasks. */
+ private int maxParallelTasks;
+
+ /** Maximum task queue size. */
+ private int maxTaskQueueSize;
+
+ /** Library names. */
+ @GridToStringExclude
+ private String[] libNames;
+
+ /** Number of cores. */
+ private int cores;
+
+ /**
+ * Get attributes for node (if any).
+ *
+ * @param node Node.
+ * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node.
+ */
+ @Nullable public static HadoopAttributes forNode(ClusterNode node) {
+ return node.attribute(NAME);
+ }
+
+ /**
+ * {@link Externalizable} support.
+ */
+ public HadoopAttributes() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cfg Configuration.
+ */
+ public HadoopAttributes(HadoopConfiguration cfg) {
+ assert cfg != null;
+ assert cfg.getMapReducePlanner() != null;
+
+ plannerCls = cfg.getMapReducePlanner().getClass().getName();
+
+ // TODO: IGNITE-404: Get from configuration when fixed.
+ extExec = false;
+
+ maxParallelTasks = cfg.getMaxParallelTasks();
+ maxTaskQueueSize = cfg.getMaxTaskQueueSize();
+ libNames = cfg.getNativeLibraryNames();
+
+ // Cores count already passed in other attributes, we add it here for convenience.
+ cores = Runtime.getRuntime().availableProcessors();
+ }
+
+ /**
+ * @return Map reduce planner class name.
+ */
+ public String plannerClassName() {
+ return plannerCls;
+ }
+
+ /**
+ * @return External execution flag.
+ */
+ public boolean externalExecution() {
+ return extExec;
+ }
+
+ /**
+ * @return Maximum parallel tasks.
+ */
+ public int maxParallelTasks() {
+ return maxParallelTasks;
+ }
+
+ /**
+ * @return Maximum task queue size.
+ */
+ public int maxTaskQueueSize() {
+ return maxTaskQueueSize;
+ }
+
+
+ /**
+ * @return Native library names.
+ */
+ public String[] nativeLibraryNames() {
+ return libNames;
+ }
+
+ /**
+ * @return Number of cores on machine.
+ */
+ public int cores() {
+ return cores;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(plannerCls);
+ out.writeBoolean(extExec);
+ out.writeInt(maxParallelTasks);
+ out.writeInt(maxTaskQueueSize);
+ out.writeObject(libNames);
+ out.writeInt(cores);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ plannerCls = (String)in.readObject();
+ extExec = in.readBoolean();
+ maxParallelTasks = in.readInt();
+ maxTaskQueueSize = in.readInt();
+ libNames = (String[])in.readObject();
+ cores = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
new file mode 100644
index 0000000..ed2657e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop facade implementation.
+ */
+public class HadoopImpl implements Hadoop {
+ /** Hadoop processor. */
+ private final HadoopProcessor proc;
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /**
+ * Constructor.
+ *
+ * @param proc Hadoop processor.
+ */
+ HadoopImpl(HadoopProcessor proc) {
+ this.proc = proc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration configuration() {
+ return proc.config();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobId nextJobId() {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.nextJobId();
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get next job ID (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.submit(jobId, jobInfo);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to submit job (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.status(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job status (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.counters(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job counters (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.finishFuture(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get job finish future (grid is stopping).");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ return proc.kill(jobId);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to kill job (grid is stopping).");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
new file mode 100644
index 0000000..520f094
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopEmbeddedTaskExecutor;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Hadoop processor.
+ */
+public class HadoopProcessor extends HadoopProcessorAdapter {
+ /** Job ID counter. */
+ private final AtomicInteger idCtr = new AtomicInteger();
+
+ /** Hadoop context. */
+ @GridToStringExclude
+ private HadoopContext hctx;
+
+ /** Hadoop facade for public API. */
+ @GridToStringExclude
+ private Hadoop hadoop;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Kernal context.
+ */
+ public HadoopProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (ctx.isDaemon())
+ return;
+
+ HadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
+
+ if (cfg == null)
+ cfg = new HadoopConfiguration();
+ else
+ cfg = new HadoopConfiguration(cfg);
+
+ initializeDefaults(cfg);
+
+ hctx = new HadoopContext(
+ ctx,
+ cfg,
+ new HadoopJobTracker(),
+ new HadoopEmbeddedTaskExecutor(),
+ // TODO: IGNITE-404: Uncomment when fixed.
+ //cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
+ new HadoopShuffle());
+
+ for (HadoopComponent c : hctx.components())
+ c.start(hctx);
+
+ hadoop = new HadoopImpl(this);
+
+ ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ if (hctx == null)
+ return;
+
+ for (HadoopComponent c : hctx.components())
+ c.onKernalStart();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (hctx == null)
+ return;
+
+ List<HadoopComponent> components = hctx.components();
+
+ for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+ HadoopComponent c = it.previous();
+
+ c.onKernalStop(cancel);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ super.stop(cancel);
+
+ if (hctx == null)
+ return;
+
+ List<HadoopComponent> components = hctx.components();
+
+ for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+ HadoopComponent c = it.previous();
+
+ c.stop(cancel);
+ }
+ }
+
+ /**
+ * Gets Hadoop context.
+ *
+ * @return Hadoop context.
+ */
+ public HadoopContext context() {
+ return hctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Hadoop hadoop() {
+ if (hadoop == null)
+ throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
+ "is HADOOP_HOME environment variable set?)");
+
+ return hadoop;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration config() {
+ return hctx.configuration();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobId nextJobId() {
+ return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ return hctx.jobTracker().submit(jobId, jobInfo);
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().status(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().jobCounters(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().finishFuture(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
+ return hctx.jobTracker().killJob(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateEnvironment() throws IgniteCheckedException {
+ // Perform some static checks as early as possible, so that any recoverable exceptions are thrown here.
+ try {
+ HadoopLocations loc = HadoopClasspathUtils.locations();
+
+ if (!F.isEmpty(loc.home()))
+ U.quietAndInfo(log, HadoopClasspathUtils.HOME + " is set to " + loc.home());
+
+ U.quietAndInfo(log, "Resolved Hadoop classpath locations: " + loc.common() + ", " + loc.hdfs() + ", " +
+ loc.mapred());
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe.getMessage(), ioe);
+ }
+
+ HadoopClassLoader.hadoopUrls();
+ }
+
+ /**
+ * Initializes default hadoop configuration.
+ *
+ * @param cfg Hadoop configuration.
+ */
+ private void initializeDefaults(HadoopConfiguration cfg) {
+ if (cfg.getMapReducePlanner() == null)
+ cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcessor.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53237dd3/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
new file mode 100644
index 0000000..ed39ce5
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Scanner;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+
+/**
+ * Setup tool to configure Hadoop client.
+ */
+public class HadoopSetup {
+ /** */
+ public static final String WINUTILS_EXE = "winutils.exe";
+
+ /** */
+ private static final FilenameFilter IGNITE_JARS = new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return name.startsWith("ignite-") && name.endsWith(".jar");
+ }
+ };
+
+ /**
+ * The main method.
+ * @param ignore Params.
+ */
+ public static void main(String[] ignore) {
+ X.println(
+ " __________ ________________ ",
+ " / _/ ___/ |/ / _/_ __/ __/ ",
+ " _/ // (7 7 // / / / / _/ ",
+ "/___/\\___/_/|_/___/ /_/ /___/ ",
+ " for Apache Hadoop ",
+ " ",
+ "ver. " + ACK_VER_STR,
+ COPYRIGHT);
+
+ configureHadoop();
+ }
+
+ /**
+ * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop.
+ * It performs these operations:
+ * <ul>
+ * <li>Check for setting of HADOOP_HOME environment variable.</li>
+ * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li>
+ * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li>
+ * <li>In Windows check new line character issues in CMD scripts.</li>
+ * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li>
+ * </ul>
+ */
+ private static void configureHadoop() {
+ String igniteHome = U.getIgniteHome();
+
+ println("IGNITE_HOME is set to '" + igniteHome + "'.");
+
+ checkIgniteHome(igniteHome);
+
+ String homeVar = "HADOOP_HOME";
+ String hadoopHome = System.getenv(homeVar);
+
+ if (F.isEmpty(hadoopHome)) {
+ homeVar = "HADOOP_PREFIX";
+ hadoopHome = System.getenv(homeVar);
+ }
+
+ if (F.isEmpty(hadoopHome))
+ exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " +
+ "valid Hadoop installation directory and run setup tool again.", null);
+
+ hadoopHome = hadoopHome.replaceAll("\"", "");
+
+ println(homeVar + " is set to '" + hadoopHome + "'.");
+
+ String hiveHome = System.getenv("HIVE_HOME");
+
+ if (!F.isEmpty(hiveHome)) {
+ hiveHome = hiveHome.replaceAll("\"", "");
+
+ println("HIVE_HOME is set to '" + hiveHome + "'.");
+ }
+
+ File hadoopDir = new File(hadoopHome);
+
+ if (!hadoopDir.exists())
+ exit("Hadoop installation folder does not exist.", null);
+
+ if (!hadoopDir.isDirectory())
+ exit("HADOOP_HOME must point to a directory.", null);
+
+ if (!hadoopDir.canRead())
+ exit("Hadoop installation folder can not be read. Please check permissions.", null);
+
+ final File hadoopCommonDir;
+
+ String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME");
+
+ if (F.isEmpty(hadoopCommonHome)) {
+ hadoopCommonDir = new File(hadoopDir, "share/hadoop/common");
+
+ println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'.");
+ }
+ else {
+ println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'.");
+
+ hadoopCommonDir = new File(hadoopCommonHome);
+ }
+
+ if (!hadoopCommonDir.canRead())
+ exit("Failed to read Hadoop common dir '" + hadoopCommonDir + "'.", null);
+
+ final File hadoopCommonLibDir = new File(hadoopCommonDir, "lib");
+
+ if (!hadoopCommonLibDir.canRead())
+ exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null);
+
+ if (U.isWindows()) {
+ checkJavaPathSpaces();
+
+ final File hadoopBinDir = new File(hadoopDir, "bin");
+
+ if (!hadoopBinDir.canRead())
+ exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null);
+
+ File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE);
+
+ if (!winutilsFile.exists()) {
+ if (ask("File '" + WINUTILS_EXE + "' does not exist. " +
+ "It may be replaced by a stub. Create it?")) {
+ println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'.");
+
+ boolean ok = false;
+
+ try {
+ ok = winutilsFile.createNewFile();
+ }
+ catch (IOException ignore) {
+ // No-op.
+ }
+
+ if (!ok)
+ exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null);
+ }
+ else
+ println("Ok. But Hadoop client probably will not work on Windows this way...");
+ }
+
+ processCmdFiles(hadoopDir, "bin", "sbin", "libexec");
+ }
+
+ File igniteLibs = new File(new File(igniteHome), "libs");
+
+ if (!igniteLibs.exists())
+ exit("Ignite 'libs' folder is not found.", null);
+
+ Collection<File> jarFiles = new ArrayList<>();
+
+ addJarsInFolder(jarFiles, igniteLibs);
+ addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop"));
+ addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop-impl"));
+
+ boolean jarsLinksCorrect = true;
+
+ for (File file : jarFiles) {
+ File link = new File(hadoopCommonLibDir, file.getName());
+
+ jarsLinksCorrect &= isJarLinkCorrect(link, file);
+
+ if (!jarsLinksCorrect)
+ break;
+ }
+
+ if (!jarsLinksCorrect) {
+ if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " +
+ "Create appropriate symbolic links?")) {
+ File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS);
+
+ if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " +
+ "installation. They must be deleted to continue. Continue?")) {
+ for (File file : oldIgniteJarFiles) {
+ println("Deleting file '" + file.getAbsolutePath() + "'.");
+
+ if (!file.delete())
+ exit("Failed to delete file '" + file.getPath() + "'.", null);
+ }
+ }
+
+ for (File file : jarFiles) {
+ File targetFile = new File(hadoopCommonLibDir, file.getName());
+
+ try {
+ println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'.");
+
+ Files.createSymbolicLink(targetFile.toPath(), file.toPath());
+ }
+ catch (IOException e) {
+ if (U.isWindows()) {
+ warn("Ability to create symbolic links is required!");
+ warn("On Windows platform you have to grant permission 'Create symbolic links'");
+ warn("to your user or run the Accelerator as Administrator.");
+ }
+
+ exit("Creating symbolic link failed! Check permissions.", e);
+ }
+ }
+ }
+ else
+ println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath...");
+ }
+
+ File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop");
+
+ File igniteHadoopCfg = igniteHadoopConfig(igniteHome);
+
+ if (!igniteHadoopCfg.canRead())
+ exit("Failed to read Ignite Hadoop 'config' folder at '" + igniteHadoopCfg.getAbsolutePath() + "'.", null);
+
+ if (hadoopEtc.canWrite()) { // TODO Bigtop
+ if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " +
+ "(existing files will be backed up)?")) {
+ replaceWithBackup(new File(igniteHadoopCfg, "core-site.ignite.xml"),
+ new File(hadoopEtc, "core-site.xml"));
+
+ replaceWithBackup(new File(igniteHadoopCfg, "mapred-site.ignite.xml"),
+ new File(hadoopEtc, "mapred-site.xml"));
+ }
+ else
+ println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory...");
+ }
+
+ if (!F.isEmpty(hiveHome)) {
+ File hiveConfDir = new File(hiveHome + File.separator + "conf");
+
+ if (!hiveConfDir.canWrite())
+ warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " +
+ "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory.");
+ else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?"))
+ replaceWithBackup(new File(igniteHadoopCfg, "hive-site.ignite.xml"),
+ new File(hiveConfDir, "hive-site.xml"));
+ else
+ println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory...");
+ }
+
+ println("Apache Hadoop setup is complete.");
+ }
+
+ /**
+ * Get Ignite Hadoop config directory.
+ *
+ * @param igniteHome Ignite home.
+ * @return Ignite Hadoop config directory.
+ */
+ private static File igniteHadoopConfig(String igniteHome) {
+ Path path = Paths.get(igniteHome, "modules", "hadoop", "config");
+
+ if (!Files.exists(path))
+ path = Paths.get(igniteHome, "config", "hadoop");
+
+ if (Files.exists(path))
+ return path.toFile();
+ else
+ return new File(igniteHome, "docs");
+ }
+
+ /**
+ * @param jarFiles Jars.
+ * @param folder Folder.
+ */
+ private static void addJarsInFolder(Collection<File> jarFiles, File folder) {
+ if (!folder.exists())
+ exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null);
+
+ jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS)));
+ }
+
+ /**
+ * Checks that JAVA_HOME does not contain space characters.
+ */
+ private static void checkJavaPathSpaces() {
+ String javaHome = System.getProperty("java.home");
+
+ if (javaHome.contains(" ")) {
+ warn("Java installation path contains space characters!");
+ warn("Hadoop client will not be able to start using '" + javaHome + "'.");
+ warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation.");
+ }
+ }
+
+ /**
+ * Checks Ignite home.
+ *
+ * @param igniteHome Ignite home.
+ */
+ private static void checkIgniteHome(String igniteHome) {
+ URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation();
+
+ try {
+ Path jar = Paths.get(jarUrl.toURI());
+ Path igHome = Paths.get(igniteHome);
+
+ if (!jar.startsWith(igHome))
+ exit("Ignite JAR files are not under IGNITE_HOME.", null);
+ }
+ catch (Exception e) {
+ exit(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Replaces target file with source file.
+ *
+ * @param from From.
+ * @param to To.
+ */
+ private static void replaceWithBackup(File from, File to) {
+ if (!from.canRead())
+ exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null);
+
+ println("Replacing file '" + to.getAbsolutePath() + "'.");
+
+ try {
+ U.copy(from, renameToBak(to), true);
+ }
+ catch (IOException e) {
+ exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e);
+ }
+ }
+
+ /**
+ * Renames file for backup.
+ *
+ * @param file File.
+ * @return File.
+ */
+ private static File renameToBak(File file) {
+ DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
+
+ if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak")))
+ exit("Failed to rename file '" + file.getPath() + "'.", null);
+
+ return file;
+ }
+
+ /**
+ * Checks if link is correct.
+ *
+ * @param link Symbolic link.
+ * @param correctTarget Correct link target.
+ * @return {@code true} If link target is correct.
+ */
+ private static boolean isJarLinkCorrect(File link, File correctTarget) {
+ if (!Files.isSymbolicLink(link.toPath()))
+ return false; // It is a real file or it does not exist.
+
+ Path target = null;
+
+ try {
+ target = Files.readSymbolicLink(link.toPath());
+ }
+ catch (IOException e) {
+ exit("Failed to read symbolic link: " + link.getAbsolutePath(), e);
+ }
+
+ return Files.exists(target) && target.toFile().equals(correctTarget);
+ }
+
+ /**
+ * Writes the question end read the boolean answer from the console.
+ *
+ * @param question Question to write.
+ * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise.
+ */
+ private static boolean ask(String question) {
+ X.println();
+ X.print(" < " + question + " (Y/N): ");
+
+ String answer = null;
+
+ if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES")))
+ answer = "Y";
+ else {
+ BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+
+ try {
+ answer = br.readLine();
+ }
+ catch (IOException e) {
+ exit("Failed to read answer: " + e.getMessage(), e);
+ }
+ }
+
+ if (answer != null && "Y".equals(answer.toUpperCase().trim())) {
+ X.println(" > Yes.");
+
+ return true;
+ }
+ else {
+ X.println(" > No.");
+
+ return false;
+ }
+ }
+
+ /**
+ * Exit with message.
+ *
+ * @param msg Exit message.
+ */
+ private static void exit(String msg, Exception e) {
+ X.println(" ");
+ X.println(" # " + msg);
+ X.println(" # Setup failed, exiting... ");
+
+ if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG")))
+ e.printStackTrace();
+
+ System.exit(1);
+ }
+
+ /**
+ * Prints message.
+ *
+ * @param msg Message.
+ */
+ private static void println(String msg) {
+ X.println(" > " + msg);
+ }
+
+ /**
+ * Prints warning.
+ *
+ * @param msg Message.
+ */
+ private static void warn(String msg) {
+ X.println(" ! " + msg);
+ }
+
+ /**
+ * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the
+ * answer. If it's 'Y' then backups original files and corrects invalid new line characters.
+ *
+ * @param rootDir Root directory to process.
+ * @param dirs Directories inside of the root to process.
+ */
+ private static void processCmdFiles(File rootDir, String... dirs) {
+ boolean answer = false;
+
+ for (String dir : dirs) {
+ File subDir = new File(rootDir, dir);
+
+ File[] cmdFiles = subDir.listFiles(new FilenameFilter() {
+ @Override public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(".cmd");
+ }
+ });
+
+ for (File file : cmdFiles) {
+ String content = null;
+
+ try (Scanner scanner = new Scanner(file)) {
+ content = scanner.useDelimiter("\\Z").next();
+ }
+ catch (FileNotFoundException e) {
+ exit("Failed to read file '" + file + "'.", e);
+ }
+
+ boolean invalid = false;
+
+ for (int i = 0; i < content.length(); i++) {
+ if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) {
+ invalid = true;
+
+ break;
+ }
+ }
+
+ if (invalid) {
+ answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?");
+
+ if (!answer) {
+ println("Ok. But Windows most probably will fail to execute them...");
+
+ return;
+ }
+
+ println("Fixing newline characters in file '" + file.getAbsolutePath() + "'.");
+
+ renameToBak(file);
+
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+ for (int i = 0; i < content.length(); i++) {
+ if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r'))
+ writer.write("\r");
+
+ writer.write(content.charAt(i));
+ }
+ }
+ catch (IOException e) {
+ exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file