You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:18 UTC
[02/31] incubator-ignite git commit: # IGNITE-386: WIP on internal
namings.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
new file mode 100644
index 0000000..f896daa
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+/**
+ * Task executor.
+ */
+public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
+ /** Job tracker. */
+ private HadoopJobTracker jobTracker;
+
+ /** */
+ private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
+
+ /** Executor service to run tasks. */
+ private GridHadoopExecutorService exec;
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ jobTracker = ctx.jobTracker();
+
+ exec = new GridHadoopExecutorService(log, ctx.kernalContext().gridName(),
+ ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ if (exec != null) {
+ exec.shutdown(3000);
+
+ if (cancel) {
+ for (GridHadoopJobId jobId : jobs.keySet())
+ cancelTasks(jobId);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ if (exec != null && !exec.shutdown(30000))
+ U.warn(log, "Failed to finish running tasks in 30 sec.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
+ ", tasksCnt=" + tasks.size() + ']');
+
+ Collection<GridHadoopRunnableTask> executedTasks = jobs.get(job.id());
+
+ if (executedTasks == null) {
+ executedTasks = new GridConcurrentHashSet<>();
+
+ Collection<GridHadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
+
+ assert extractedCol == null;
+ }
+
+ final Collection<GridHadoopRunnableTask> finalExecutedTasks = executedTasks;
+
+ for (final GridHadoopTaskInfo info : tasks) {
+ assert info != null;
+
+ GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
+ ctx.localNodeId()) {
+ @Override protected void onTaskFinished(GridHadoopTaskStatus status) {
+ if (log.isDebugEnabled())
+ log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
+ "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
+
+ finalExecutedTasks.remove(this);
+
+ jobTracker.onTaskFinished(info, status);
+ }
+
+ @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return ctx.shuffle().input(taskCtx);
+ }
+
+ @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return ctx.shuffle().output(taskCtx);
+ }
+ };
+
+ executedTasks.add(task);
+
+ exec.submit(task);
+ }
+ }
+
+ /**
+ * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
+ * for this job ID.
+ * <p>
+ * It is guaranteed that this method will not be called concurrently with
+ * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
+ * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
+ *
+ * @param jobId Job ID to cancel.
+ */
+ @Override public void cancelTasks(GridHadoopJobId jobId) {
+ Collection<GridHadoopRunnableTask> executedTasks = jobs.get(jobId);
+
+ if (executedTasks != null) {
+ for (GridHadoopRunnableTask task : executedTasks)
+ task.cancel();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException {
+ if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
+ Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
+
+ assert executedTasks == null || executedTasks.isEmpty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
new file mode 100644
index 0000000..a3d3bf7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+
+import java.util.*;
+
+/**
+ * Common superclass for task executor.
+ */
+public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
+ /**
+ * Runs tasks.
+ *
+ * @param job Job.
+ * @param tasks Tasks.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException;
+
+ /**
+ * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
+ * for this job ID.
+ * <p>
+ * It is guaranteed that this method will not be called concurrently with
+ * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
+ * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
+ *
+ * @param jobId Job ID to cancel.
+ */
+ public abstract void cancelTasks(GridHadoopJobId jobId) throws IgniteCheckedException;
+
+ /**
+ * On job state change callback;
+ *
+ * @param meta Job metadata.
+ */
+ public abstract void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java
deleted file mode 100644
index 72185c0..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java
+++ /dev/null
@@ -1,960 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
-
-/**
- * External process registry. Handles external process lifecycle.
- */
-public class GridHadoopExternalTaskExecutor extends GridHadoopTaskExecutorAdapter {
- /** Hadoop context. */
- private GridHadoopContext ctx;
-
- /** */
- private String javaCmd;
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Node process descriptor. */
- private GridHadoopProcessDescriptor nodeDesc;
-
- /** Output base. */
- private File outputBase;
-
- /** Path separator. */
- private String pathSep;
-
- /** Hadoop external communication. */
- private GridHadoopExternalCommunication comm;
-
- /** Starting processes. */
- private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
-
- /** Starting processes. */
- private final ConcurrentMap<GridHadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
-
- /** Busy lock. */
- private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
-
- /** Job tracker. */
- private GridHadoopJobTracker jobTracker;
-
- /** {@inheritDoc} */
- @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException {
- this.ctx = ctx;
-
- log = ctx.kernalContext().log(GridHadoopExternalTaskExecutor.class);
-
- outputBase = U.resolveWorkDirectory("hadoop", false);
-
- pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
-
- initJavaCommand();
-
- comm = new GridHadoopExternalCommunication(
- ctx.localNodeId(),
- UUID.randomUUID(),
- ctx.kernalContext().config().getMarshaller(),
- log,
- ctx.kernalContext().getSystemExecutorService(),
- ctx.kernalContext().gridName());
-
- comm.setListener(new MessageListener());
-
- comm.start();
-
- nodeDesc = comm.localProcessDescriptor();
-
- ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
- GridHadoopExternalTaskExecutor.class);
-
- if (nodeDesc.sharedMemoryPort() != -1)
- ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
- GridHadoopExternalTaskExecutor.class);
-
- jobTracker = ctx.jobTracker();
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) {
- busyLock.writeLock();
-
- try {
- comm.stop();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onJobStateChanged(final GridHadoopJobMetadata meta) {
- final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
-
- // If we have a local process for this job.
- if (proc != null) {
- if (log.isDebugEnabled())
- log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
-
- if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
- if (log.isDebugEnabled())
- log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
- ", proc=" + proc + ']');
-
- runningProcsByJobId.remove(meta.jobId());
- runningProcsByProcId.remove(proc.descriptor().processId());
-
- proc.terminate();
-
- return;
- }
-
- if (proc.initFut.isDone()) {
- if (!proc.initFut.isFailed())
- sendJobInfoUpdate(proc, meta);
- else if (log.isDebugEnabled())
- log.debug("Failed to initialize child process (will skip job state notification) " +
- "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
- }
- else {
- proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
- @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
- try {
- f.get();
-
- sendJobInfoUpdate(proc, meta);
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to initialize child process (will skip job state notification) " +
- "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
- }
-
- }
- });
- }
- }
- else if (ctx.isParticipating(meta)) {
- GridHadoopJob job;
-
- try {
- job = jobTracker.job(meta.jobId(), meta.jobInfo());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to get job: " + meta.jobId(), e);
-
- return;
- }
-
- startProcess(job, meta.mapReducePlan());
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override public void run(final GridHadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
- if (!busyLock.tryReadLock()) {
- if (log.isDebugEnabled())
- log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
-
- return;
- }
-
- try {
- HadoopProcess proc = runningProcsByJobId.get(job.id());
-
- GridHadoopTaskType taskType = F.first(tasks).type();
-
- if (taskType == GridHadoopTaskType.SETUP || taskType == GridHadoopTaskType.ABORT ||
- taskType == GridHadoopTaskType.COMMIT) {
- if (proc == null || proc.terminated()) {
- runningProcsByJobId.remove(job.id(), proc);
-
- // Start new process for ABORT task since previous processes were killed.
- proc = startProcess(job, jobTracker.plan(job.id()));
-
- if (log.isDebugEnabled())
- log.debug("Starting new process for maintenance task [jobId=" + job.id() +
- ", proc=" + proc + ", taskType=" + taskType + ']');
- }
- }
- else
- assert proc != null : "Missing started process for task execution request: " + job.id() +
- ", tasks=" + tasks;
-
- final HadoopProcess proc0 = proc;
-
- proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
- @Override public void apply(
- IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
- if (!busyLock.tryReadLock())
- return;
-
- try {
- f.get();
-
- proc0.addTasks(tasks);
-
- if (log.isDebugEnabled())
- log.debug("Sending task execution request to child process [jobId=" + job.id() +
- ", proc=" + proc0 + ", tasks=" + tasks + ']');
-
- sendExecutionRequest(proc0, job, tasks);
- }
- catch (IgniteCheckedException e) {
- notifyTasksFailed(tasks, FAILED, e);
- }
- finally {
- busyLock.readUnlock();
- }
- }
- });
- }
- finally {
- busyLock.readUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancelTasks(GridHadoopJobId jobId) {
- HadoopProcess proc = runningProcsByJobId.get(jobId);
-
- if (proc != null)
- proc.terminate();
- }
-
- /**
- * Sends execution request to remote node.
- *
- * @param proc Process to send request to.
- * @param job Job instance.
- * @param tasks Collection of tasks to execute in started process.
- */
- private void sendExecutionRequest(HadoopProcess proc, GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks)
- throws IgniteCheckedException {
- // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
- proc.lock();
-
- try {
- if (proc.terminated()) {
- notifyTasksFailed(tasks, CRASHED, null);
-
- return;
- }
-
- GridHadoopTaskExecutionRequest req = new GridHadoopTaskExecutionRequest();
-
- req.jobId(job.id());
- req.jobInfo(job.info());
- req.tasks(tasks);
-
- comm.sendMessage(proc.descriptor(), req);
- }
- finally {
- proc.unlock();
- }
- }
-
- /**
- * @return External task metadata.
- */
- private GridHadoopExternalTaskMetadata buildTaskMeta() {
- GridHadoopExternalTaskMetadata meta = new GridHadoopExternalTaskMetadata();
-
- meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
- meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
- "-DIGNITE_HOME=" + U.getIgniteHome()));
-
- return meta;
- }
-
- /**
- * @param tasks Tasks to notify about.
- * @param state Fail state.
- * @param e Optional error.
- */
- private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, GridHadoopTaskState state, Throwable e) {
- GridHadoopTaskStatus fail = new GridHadoopTaskStatus(state, e);
-
- for (GridHadoopTaskInfo task : tasks)
- jobTracker.onTaskFinished(task, fail);
- }
-
- /**
- * Starts process template that will be ready to execute Hadoop tasks.
- *
- * @param job Job instance.
- * @param plan Map reduce plan.
- */
- private HadoopProcess startProcess(final GridHadoopJob job, final GridHadoopMapReducePlan plan) {
- final UUID childProcId = UUID.randomUUID();
-
- GridHadoopJobId jobId = job.id();
-
- final GridHadoopProcessFuture fut = new GridHadoopProcessFuture(childProcId, jobId, ctx.kernalContext());
-
- final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
-
- HadoopProcess old = runningProcsByJobId.put(jobId, proc);
-
- assert old == null;
-
- old = runningProcsByProcId.put(childProcId, proc);
-
- assert old == null;
-
- ctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- if (!busyLock.tryReadLock()) {
- fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
-
- return;
- }
-
- try {
- GridHadoopExternalTaskMetadata startMeta = buildTaskMeta();
-
- if (log.isDebugEnabled())
- log.debug("Created hadoop child process metadata for job [job=" + job +
- ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
-
- Process proc = startJavaProcess(childProcId, startMeta, job);
-
- BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
- String line;
-
- // Read up all the process output.
- while ((line = rdr.readLine()) != null) {
- if (log.isDebugEnabled())
- log.debug("Tracing process output: " + line);
-
- if ("Started".equals(line)) {
- // Process started successfully, it should not write anything more to the output stream.
- if (log.isDebugEnabled())
- log.debug("Successfully started child process [childProcId=" + childProcId +
- ", meta=" + job + ']');
-
- fut.onProcessStarted(proc);
-
- break;
- }
- else if ("Failed".equals(line)) {
- StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
-
- while ((line = rdr.readLine()) != null)
- sb.append(" ").append(line).append("\n");
-
- // Cut last character.
- sb.setLength(sb.length() - 1);
-
- log.warning(sb.toString());
-
- fut.onDone(new IgniteCheckedException(sb.toString()));
-
- break;
- }
- }
- }
- catch (Throwable e) {
- fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
- }
- finally {
- busyLock.readUnlock();
- }
- }
- }, true);
-
- fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
- @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
- try {
- // Make sure there were no exceptions.
- f.get();
-
- prepareForJob(proc, job, plan);
- }
- catch (IgniteCheckedException ignore) {
- // Exception is printed in future's onDone() method.
- }
- }
- });
-
- return proc;
- }
-
- /**
- * Checks that java local command is available.
- *
- * @throws IgniteCheckedException If initialization failed.
- */
- private void initJavaCommand() throws IgniteCheckedException {
- String javaHome = System.getProperty("java.home");
-
- if (javaHome == null)
- javaHome = System.getenv("JAVA_HOME");
-
- if (javaHome == null)
- throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
-
- javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
-
- try {
- Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
-
- Collection<String> out = readProcessOutput(proc);
-
- int res = proc.waitFor();
-
- if (res != 0)
- throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
- "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
-
- if (log.isInfoEnabled()) {
- log.info("Will use java for external task execution: ");
-
- for (String s : out)
- log.info(" " + s);
- }
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to check java for external task execution.", e);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
- }
- }
-
- /**
- * Reads process output line-by-line.
- *
- * @param proc Process to read output.
- * @return Read lines.
- * @throws IOException If read failed.
- */
- private Collection<String> readProcessOutput(Process proc) throws IOException {
- BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
- Collection<String> res = new ArrayList<>();
-
- String s;
-
- while ((s = rdr.readLine()) != null)
- res.add(s);
-
- return res;
- }
-
- /**
- * Builds process from metadata.
- *
- * @param childProcId Child process ID.
- * @param startMeta Metadata.
- * @param job Job.
- * @return Started process.
- */
- private Process startJavaProcess(UUID childProcId, GridHadoopExternalTaskMetadata startMeta,
- GridHadoopJob job) throws Exception {
- String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
-
- if (log.isDebugEnabled())
- log.debug("Will write process log output to: " + outFldr);
-
- List<String> cmd = new ArrayList<>();
-
- File workDir = U.resolveWorkDirectory("", false);
-
- cmd.add(javaCmd);
- cmd.addAll(startMeta.jvmOptions());
- cmd.add("-cp");
- cmd.add(buildClasspath(startMeta.classpath()));
- cmd.add(GridHadoopExternalProcessStarter.class.getName());
- cmd.add("-cpid");
- cmd.add(String.valueOf(childProcId));
- cmd.add("-ppid");
- cmd.add(String.valueOf(nodeDesc.processId()));
- cmd.add("-nid");
- cmd.add(String.valueOf(nodeDesc.parentNodeId()));
- cmd.add("-addr");
- cmd.add(nodeDesc.address());
- cmd.add("-tport");
- cmd.add(String.valueOf(nodeDesc.tcpPort()));
- cmd.add("-sport");
- cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
- cmd.add("-out");
- cmd.add(outFldr);
- cmd.add("-wd");
- cmd.add(workDir.getAbsolutePath());
-
- return new ProcessBuilder(cmd)
- .redirectErrorStream(true)
- .directory(workDir)
- .start();
- }
-
- /**
- * Gets job work folder.
- *
- * @param jobId Job ID.
- * @return Job work folder.
- */
- private String jobWorkFolder(GridHadoopJobId jobId) {
- return outputBase + File.separator + "Job_" + jobId;
- }
-
- /**
- * @param cp Classpath collection.
- * @return Classpath string.
- */
- private String buildClasspath(Collection<String> cp) {
- assert !cp.isEmpty();
-
- StringBuilder sb = new StringBuilder();
-
- for (String s : cp)
- sb.append(s).append(pathSep);
-
- sb.setLength(sb.length() - 1);
-
- return sb.toString();
- }
-
- /**
- * Sends job info update request to remote process.
- *
- * @param proc Process to send request to.
- * @param meta Job metadata.
- */
- private void sendJobInfoUpdate(HadoopProcess proc, GridHadoopJobMetadata meta) {
- Map<Integer, GridHadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
-
- int rdcNum = meta.mapReducePlan().reducers();
-
- GridHadoopProcessDescriptor[] addrs = null;
-
- if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
- addrs = new GridHadoopProcessDescriptor[rdcNum];
-
- for (int i = 0; i < rdcNum; i++) {
- GridHadoopProcessDescriptor desc = rdcAddrs.get(i);
-
- assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
-
- addrs[i] = desc;
- }
- }
-
- try {
- comm.sendMessage(proc.descriptor(), new GridHadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
- }
- catch (IgniteCheckedException e) {
- if (!proc.terminated()) {
- log.error("Failed to send job state update message to remote child process (will kill the process) " +
- "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
-
- proc.terminate();
- }
- }
- }
-
- /**
- * Sends prepare request to remote process.
- *
- * @param proc Process to send request to.
- * @param job Job.
- * @param plan Map reduce plan.
- */
- private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) {
- try {
- comm.sendMessage(proc.descriptor(), new GridHadoopPrepareForJobRequest(job.id(), job.info(),
- plan.reducers(), plan.reducers(ctx.localNodeId())));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
- ", plan=" + plan + ']', e);
-
- proc.terminate();
- }
- }
-
- /**
- * Processes task finished message.
- *
- * @param desc Remote process descriptor.
- * @param taskMsg Task finished message.
- */
- private void processTaskFinishedMessage(GridHadoopProcessDescriptor desc, GridHadoopTaskFinishedMessage taskMsg) {
- HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
- if (proc != null)
- proc.removeTask(taskMsg.taskInfo());
-
- jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
- }
-
- /**
- *
- */
- private class MessageListener implements GridHadoopMessageListener {
- /** {@inheritDoc} */
- @Override public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
- if (!busyLock.tryReadLock())
- return;
-
- try {
- if (msg instanceof GridHadoopProcessStartedAck) {
- HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
- assert proc != null : "Missing child process for processId: " + desc;
-
- GridHadoopProcessFuture fut = proc.initFut;
-
- if (fut != null)
- fut.onReplyReceived(desc);
- // Safety.
- else
- log.warning("Failed to find process start future (will ignore): " + desc);
- }
- else if (msg instanceof GridHadoopTaskFinishedMessage) {
- GridHadoopTaskFinishedMessage taskMsg = (GridHadoopTaskFinishedMessage)msg;
-
- processTaskFinishedMessage(desc, taskMsg);
- }
- else
- log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
- }
- finally {
- busyLock.readUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) {
- if (!busyLock.tryReadLock())
- return;
-
- try {
- if (desc == null) {
- U.warn(log, "Handshake failed.");
-
- return;
- }
-
- // Notify job tracker about failed tasks.
- HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
- if (proc != null) {
- Collection<GridHadoopTaskInfo> tasks = proc.tasks();
-
- if (!F.isEmpty(tasks)) {
- log.warning("Lost connection with alive process (will terminate): " + desc);
-
- GridHadoopTaskStatus status = new GridHadoopTaskStatus(CRASHED,
- new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
-
- for (GridHadoopTaskInfo info : tasks)
- jobTracker.onTaskFinished(info, status);
-
- runningProcsByJobId.remove(proc.jobId(), proc);
- }
-
- // Safety.
- proc.terminate();
- }
- }
- finally {
- busyLock.readUnlock();
- }
- }
- }
-
- /**
- * Hadoop process.
- */
- private static class HadoopProcess extends ReentrantLock {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- private final GridHadoopJobId jobId;
-
- /** Process. */
- private Process proc;
-
- /** Init future. Completes when process is ready to receive messages. */
- private final GridHadoopProcessFuture initFut;
-
- /** Process descriptor. */
- private GridHadoopProcessDescriptor procDesc;
-
- /** Reducers planned for this process. */
- private Collection<Integer> reducers;
-
- /** Tasks. */
- private final Collection<GridHadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
-
- /** Terminated flag. */
- private volatile boolean terminated;
-
- /**
- * @param jobId Job ID.
- * @param initFut Init future.
- */
- private HadoopProcess(GridHadoopJobId jobId, GridHadoopProcessFuture initFut,
- int[] reducers) {
- this.jobId = jobId;
- this.initFut = initFut;
-
- if (!F.isEmpty(reducers)) {
- this.reducers = new ArrayList<>(reducers.length);
-
- for (int r : reducers)
- this.reducers.add(r);
- }
- }
-
- /**
- * @return Communication process descriptor.
- */
- private GridHadoopProcessDescriptor descriptor() {
- return procDesc;
- }
-
- /**
- * @return Job ID.
- */
- public GridHadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * Initialized callback.
- *
- * @param proc Java process representation.
- * @param procDesc Process descriptor.
- */
- private void onInitialized(Process proc, GridHadoopProcessDescriptor procDesc) {
- this.proc = proc;
- this.procDesc = procDesc;
- }
-
- /**
- * Terminates process (kills it).
- */
- private void terminate() {
- // Guard against concurrent message sending.
- lock();
-
- try {
- terminated = true;
-
- if (!initFut.isDone())
- initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
- @Override public void apply(
- IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
- proc.destroy();
- }
- });
- else
- proc.destroy();
- }
- finally {
- unlock();
- }
- }
-
- /**
- * @return Terminated flag.
- */
- private boolean terminated() {
- return terminated;
- }
-
- /**
- * Sets process tasks.
- *
- * @param tasks Tasks to set.
- */
- private void addTasks(Collection<GridHadoopTaskInfo> tasks) {
- this.tasks.addAll(tasks);
- }
-
- /**
- * Removes task when it was completed.
- *
- * @param task Task to remove.
- */
- private void removeTask(GridHadoopTaskInfo task) {
- if (tasks != null)
- tasks.remove(task);
- }
-
- /**
- * @return Collection of tasks.
- */
- private Collection<GridHadoopTaskInfo> tasks() {
- return tasks;
- }
-
- /**
- * @return Planned reducers.
- */
- private Collection<Integer> reducers() {
- return reducers;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopProcess.class, this);
- }
- }
-
- /**
- *
- */
- private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Child process ID. */
- private UUID childProcId;
-
- /** Job ID. */
- private GridHadoopJobId jobId;
-
- /** Process descriptor. */
- private GridHadoopProcessDescriptor desc;
-
- /** Running process. */
- private Process proc;
-
- /** Process started flag. */
- private volatile boolean procStarted;
-
- /** Reply received flag. */
- private volatile boolean replyReceived;
-
- /** Logger. */
- private final IgniteLogger log = GridHadoopExternalTaskExecutor.this.log;
-
- /**
- * Empty constructor.
- */
- public GridHadoopProcessFuture() {
- // No-op.
- }
-
- /**
- * @param ctx Kernal context.
- */
- private GridHadoopProcessFuture(UUID childProcId, GridHadoopJobId jobId, GridKernalContext ctx) {
- super(ctx);
-
- this.childProcId = childProcId;
- this.jobId = jobId;
- }
-
- /**
- * Process started callback.
- */
- public void onProcessStarted(Process proc) {
- this.proc = proc;
-
- procStarted = true;
-
- if (procStarted && replyReceived)
- onDone(F.t(proc, desc));
- }
-
- /**
- * Reply received callback.
- */
- public void onReplyReceived(GridHadoopProcessDescriptor desc) {
- assert childProcId.equals(desc.processId());
-
- this.desc = desc;
-
- replyReceived = true;
-
- if (procStarted && replyReceived)
- onDone(F.t(proc, desc));
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable IgniteBiTuple<Process, GridHadoopProcessDescriptor> res,
- @Nullable Throwable err) {
- if (err == null) {
- HadoopProcess proc = runningProcsByProcId.get(childProcId);
-
- assert proc != null;
-
- assert proc.initFut == this;
-
- proc.onInitialized(res.get1(), res.get2());
-
- if (!F.isEmpty(proc.reducers()))
- jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
- }
- else {
- // Clean up since init failed.
- runningProcsByJobId.remove(jobId);
- runningProcsByProcId.remove(childProcId);
- }
-
- if (super.onDone(res, err)) {
- if (err == null) {
- if (log.isDebugEnabled())
- log.debug("Initialized child process for external task execution [jobId=" + jobId +
- ", desc=" + desc + ", initTime=" + duration() + ']');
- }
- else
- U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
- ", desc=" + desc + ']', err);
-
- return true;
- }
-
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
new file mode 100644
index 0000000..616d383
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -0,0 +1,960 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
+
+/**
+ * External process registry. Handles external process lifecycle.
+ */
+public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
+ /** Hadoop context. */
+ private HadoopContext ctx;
+
+ /** */
+ private String javaCmd;
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Node process descriptor. */
+ private GridHadoopProcessDescriptor nodeDesc;
+
+ /** Output base. */
+ private File outputBase;
+
+ /** Path separator. */
+ private String pathSep;
+
+ /** Hadoop external communication. */
+ private GridHadoopExternalCommunication comm;
+
+ /** Starting processes. */
+ private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
+
+ /** Starting processes. */
+ private final ConcurrentMap<GridHadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
+
+ /** Busy lock. */
+ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+
+ /** Job tracker. */
+ private HadoopJobTracker jobTracker;
+
+ /** {@inheritDoc} */
+ @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+ this.ctx = ctx;
+
+ log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class);
+
+ outputBase = U.resolveWorkDirectory("hadoop", false);
+
+ pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
+
+ initJavaCommand();
+
+ comm = new GridHadoopExternalCommunication(
+ ctx.localNodeId(),
+ UUID.randomUUID(),
+ ctx.kernalContext().config().getMarshaller(),
+ log,
+ ctx.kernalContext().getSystemExecutorService(),
+ ctx.kernalContext().gridName());
+
+ comm.setListener(new MessageListener());
+
+ comm.start();
+
+ nodeDesc = comm.localProcessDescriptor();
+
+ ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
+ HadoopExternalTaskExecutor.class);
+
+ if (nodeDesc.sharedMemoryPort() != -1)
+ ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
+ HadoopExternalTaskExecutor.class);
+
+ jobTracker = ctx.jobTracker();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ busyLock.writeLock();
+
+ try {
+ comm.stop();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onJobStateChanged(final GridHadoopJobMetadata meta) {
+ final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
+
+ // If we have a local process for this job.
+ if (proc != null) {
+ if (log.isDebugEnabled())
+ log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
+
+ if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
+ if (log.isDebugEnabled())
+ log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
+ ", proc=" + proc + ']');
+
+ runningProcsByJobId.remove(meta.jobId());
+ runningProcsByProcId.remove(proc.descriptor().processId());
+
+ proc.terminate();
+
+ return;
+ }
+
+ if (proc.initFut.isDone()) {
+ if (!proc.initFut.isFailed())
+ sendJobInfoUpdate(proc, meta);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to initialize child process (will skip job state notification) " +
+ "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
+ }
+ else {
+ proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+ try {
+ f.get();
+
+ sendJobInfoUpdate(proc, meta);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to initialize child process (will skip job state notification) " +
+ "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
+ }
+
+ }
+ });
+ }
+ }
+ else if (ctx.isParticipating(meta)) {
+ GridHadoopJob job;
+
+ try {
+ job = jobTracker.job(meta.jobId(), meta.jobInfo());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to get job: " + meta.jobId(), e);
+
+ return;
+ }
+
+ startProcess(job, meta.mapReducePlan());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
+ @Override public void run(final GridHadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
+ if (!busyLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
+
+ return;
+ }
+
+ try {
+ HadoopProcess proc = runningProcsByJobId.get(job.id());
+
+ GridHadoopTaskType taskType = F.first(tasks).type();
+
+ if (taskType == GridHadoopTaskType.SETUP || taskType == GridHadoopTaskType.ABORT ||
+ taskType == GridHadoopTaskType.COMMIT) {
+ if (proc == null || proc.terminated()) {
+ runningProcsByJobId.remove(job.id(), proc);
+
+ // Start new process for ABORT task since previous processes were killed.
+ proc = startProcess(job, jobTracker.plan(job.id()));
+
+ if (log.isDebugEnabled())
+ log.debug("Starting new process for maintenance task [jobId=" + job.id() +
+ ", proc=" + proc + ", taskType=" + taskType + ']');
+ }
+ }
+ else
+ assert proc != null : "Missing started process for task execution request: " + job.id() +
+ ", tasks=" + tasks;
+
+ final HadoopProcess proc0 = proc;
+
+ proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+ @Override public void apply(
+ IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ f.get();
+
+ proc0.addTasks(tasks);
+
+ if (log.isDebugEnabled())
+ log.debug("Sending task execution request to child process [jobId=" + job.id() +
+ ", proc=" + proc0 + ", tasks=" + tasks + ']');
+
+ sendExecutionRequest(proc0, job, tasks);
+ }
+ catch (IgniteCheckedException e) {
+ notifyTasksFailed(tasks, FAILED, e);
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ });
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancelTasks(GridHadoopJobId jobId) {
+ HadoopProcess proc = runningProcsByJobId.get(jobId);
+
+ if (proc != null)
+ proc.terminate();
+ }
+
+ /**
+ * Sends execution request to remote node.
+ *
+ * @param proc Process to send request to.
+ * @param job Job instance.
+ * @param tasks Collection of tasks to execute in started process.
+ */
+ private void sendExecutionRequest(HadoopProcess proc, GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks)
+ throws IgniteCheckedException {
+ // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
+ proc.lock();
+
+ try {
+ if (proc.terminated()) {
+ notifyTasksFailed(tasks, CRASHED, null);
+
+ return;
+ }
+
+ GridHadoopTaskExecutionRequest req = new GridHadoopTaskExecutionRequest();
+
+ req.jobId(job.id());
+ req.jobInfo(job.info());
+ req.tasks(tasks);
+
+ comm.sendMessage(proc.descriptor(), req);
+ }
+ finally {
+ proc.unlock();
+ }
+ }
+
+ /**
+ * @return External task metadata.
+ */
+ private GridHadoopExternalTaskMetadata buildTaskMeta() {
+ GridHadoopExternalTaskMetadata meta = new GridHadoopExternalTaskMetadata();
+
+ meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
+ meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
+ "-DIGNITE_HOME=" + U.getIgniteHome()));
+
+ return meta;
+ }
+
+ /**
+ * @param tasks Tasks to notify about.
+ * @param state Fail state.
+ * @param e Optional error.
+ */
+ private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, GridHadoopTaskState state, Throwable e) {
+ GridHadoopTaskStatus fail = new GridHadoopTaskStatus(state, e);
+
+ for (GridHadoopTaskInfo task : tasks)
+ jobTracker.onTaskFinished(task, fail);
+ }
+
+ /**
+ * Starts process template that will be ready to execute Hadoop tasks.
+ *
+ * @param job Job instance.
+ * @param plan Map reduce plan.
+ */
+ private HadoopProcess startProcess(final GridHadoopJob job, final GridHadoopMapReducePlan plan) {
+ final UUID childProcId = UUID.randomUUID();
+
+ GridHadoopJobId jobId = job.id();
+
+ final GridHadoopProcessFuture fut = new GridHadoopProcessFuture(childProcId, jobId, ctx.kernalContext());
+
+ final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
+
+ HadoopProcess old = runningProcsByJobId.put(jobId, proc);
+
+ assert old == null;
+
+ old = runningProcsByProcId.put(childProcId, proc);
+
+ assert old == null;
+
+ ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ if (!busyLock.tryReadLock()) {
+ fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
+
+ return;
+ }
+
+ try {
+ GridHadoopExternalTaskMetadata startMeta = buildTaskMeta();
+
+ if (log.isDebugEnabled())
+ log.debug("Created hadoop child process metadata for job [job=" + job +
+ ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
+
+ Process proc = startJavaProcess(childProcId, startMeta, job);
+
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+ String line;
+
+ // Read up all the process output.
+ while ((line = rdr.readLine()) != null) {
+ if (log.isDebugEnabled())
+ log.debug("Tracing process output: " + line);
+
+ if ("Started".equals(line)) {
+ // Process started successfully, it should not write anything more to the output stream.
+ if (log.isDebugEnabled())
+ log.debug("Successfully started child process [childProcId=" + childProcId +
+ ", meta=" + job + ']');
+
+ fut.onProcessStarted(proc);
+
+ break;
+ }
+ else if ("Failed".equals(line)) {
+ StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
+
+ while ((line = rdr.readLine()) != null)
+ sb.append(" ").append(line).append("\n");
+
+ // Cut last character.
+ sb.setLength(sb.length() - 1);
+
+ log.warning(sb.toString());
+
+ fut.onDone(new IgniteCheckedException(sb.toString()));
+
+ break;
+ }
+ }
+ }
+ catch (Throwable e) {
+ fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ }, true);
+
+ fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+ try {
+ // Make sure there were no exceptions.
+ f.get();
+
+ prepareForJob(proc, job, plan);
+ }
+ catch (IgniteCheckedException ignore) {
+ // Exception is printed in future's onDone() method.
+ }
+ }
+ });
+
+ return proc;
+ }
+
+ /**
+ * Checks that java local command is available.
+ *
+ * @throws IgniteCheckedException If initialization failed.
+ */
+ private void initJavaCommand() throws IgniteCheckedException {
+ String javaHome = System.getProperty("java.home");
+
+ if (javaHome == null)
+ javaHome = System.getenv("JAVA_HOME");
+
+ if (javaHome == null)
+ throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
+
+ javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
+
+ try {
+ Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
+
+ Collection<String> out = readProcessOutput(proc);
+
+ int res = proc.waitFor();
+
+ if (res != 0)
+ throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
+ "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
+
+ if (log.isInfoEnabled()) {
+ log.info("Will use java for external task execution: ");
+
+ for (String s : out)
+ log.info(" " + s);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to check java for external task execution.", e);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
+ }
+ }
+
+ /**
+ * Reads process output line-by-line.
+ *
+ * @param proc Process to read output.
+ * @return Read lines.
+ * @throws IOException If read failed.
+ */
+ private Collection<String> readProcessOutput(Process proc) throws IOException {
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+ Collection<String> res = new ArrayList<>();
+
+ String s;
+
+ while ((s = rdr.readLine()) != null)
+ res.add(s);
+
+ return res;
+ }
+
+ /**
+ * Builds process from metadata.
+ *
+ * @param childProcId Child process ID.
+ * @param startMeta Metadata.
+ * @param job Job.
+ * @return Started process.
+ */
+ private Process startJavaProcess(UUID childProcId, GridHadoopExternalTaskMetadata startMeta,
+ GridHadoopJob job) throws Exception {
+ String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
+
+ if (log.isDebugEnabled())
+ log.debug("Will write process log output to: " + outFldr);
+
+ List<String> cmd = new ArrayList<>();
+
+ File workDir = U.resolveWorkDirectory("", false);
+
+ cmd.add(javaCmd);
+ cmd.addAll(startMeta.jvmOptions());
+ cmd.add("-cp");
+ cmd.add(buildClasspath(startMeta.classpath()));
+ cmd.add(GridHadoopExternalProcessStarter.class.getName());
+ cmd.add("-cpid");
+ cmd.add(String.valueOf(childProcId));
+ cmd.add("-ppid");
+ cmd.add(String.valueOf(nodeDesc.processId()));
+ cmd.add("-nid");
+ cmd.add(String.valueOf(nodeDesc.parentNodeId()));
+ cmd.add("-addr");
+ cmd.add(nodeDesc.address());
+ cmd.add("-tport");
+ cmd.add(String.valueOf(nodeDesc.tcpPort()));
+ cmd.add("-sport");
+ cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
+ cmd.add("-out");
+ cmd.add(outFldr);
+ cmd.add("-wd");
+ cmd.add(workDir.getAbsolutePath());
+
+ return new ProcessBuilder(cmd)
+ .redirectErrorStream(true)
+ .directory(workDir)
+ .start();
+ }
+
+ /**
+ * Gets job work folder.
+ *
+ * @param jobId Job ID.
+ * @return Job work folder.
+ */
+ private String jobWorkFolder(GridHadoopJobId jobId) {
+ return outputBase + File.separator + "Job_" + jobId;
+ }
+
+ /**
+ * @param cp Classpath collection.
+ * @return Classpath string.
+ */
+ private String buildClasspath(Collection<String> cp) {
+ assert !cp.isEmpty();
+
+ StringBuilder sb = new StringBuilder();
+
+ for (String s : cp)
+ sb.append(s).append(pathSep);
+
+ sb.setLength(sb.length() - 1);
+
+ return sb.toString();
+ }
+
+ /**
+ * Sends job info update request to remote process.
+ *
+ * @param proc Process to send request to.
+ * @param meta Job metadata.
+ */
+ private void sendJobInfoUpdate(HadoopProcess proc, GridHadoopJobMetadata meta) {
+ Map<Integer, GridHadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
+
+ int rdcNum = meta.mapReducePlan().reducers();
+
+ GridHadoopProcessDescriptor[] addrs = null;
+
+ if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
+ addrs = new GridHadoopProcessDescriptor[rdcNum];
+
+ for (int i = 0; i < rdcNum; i++) {
+ GridHadoopProcessDescriptor desc = rdcAddrs.get(i);
+
+ assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
+
+ addrs[i] = desc;
+ }
+ }
+
+ try {
+ comm.sendMessage(proc.descriptor(), new GridHadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
+ }
+ catch (IgniteCheckedException e) {
+ if (!proc.terminated()) {
+ log.error("Failed to send job state update message to remote child process (will kill the process) " +
+ "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
+
+ proc.terminate();
+ }
+ }
+ }
+
+ /**
+ * Sends prepare request to remote process.
+ *
+ * @param proc Process to send request to.
+ * @param job Job.
+ * @param plan Map reduce plan.
+ */
+ private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) {
+ try {
+ comm.sendMessage(proc.descriptor(), new GridHadoopPrepareForJobRequest(job.id(), job.info(),
+ plan.reducers(), plan.reducers(ctx.localNodeId())));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
+ ", plan=" + plan + ']', e);
+
+ proc.terminate();
+ }
+ }
+
+ /**
+ * Processes task finished message.
+ *
+ * @param desc Remote process descriptor.
+ * @param taskMsg Task finished message.
+ */
+ private void processTaskFinishedMessage(GridHadoopProcessDescriptor desc, GridHadoopTaskFinishedMessage taskMsg) {
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ if (proc != null)
+ proc.removeTask(taskMsg.taskInfo());
+
+ jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
+ }
+
+ /**
+ *
+ */
+ private class MessageListener implements GridHadoopMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ if (msg instanceof GridHadoopProcessStartedAck) {
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ assert proc != null : "Missing child process for processId: " + desc;
+
+ GridHadoopProcessFuture fut = proc.initFut;
+
+ if (fut != null)
+ fut.onReplyReceived(desc);
+ // Safety.
+ else
+ log.warning("Failed to find process start future (will ignore): " + desc);
+ }
+ else if (msg instanceof GridHadoopTaskFinishedMessage) {
+ GridHadoopTaskFinishedMessage taskMsg = (GridHadoopTaskFinishedMessage)msg;
+
+ processTaskFinishedMessage(desc, taskMsg);
+ }
+ else
+ log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ if (desc == null) {
+ U.warn(log, "Handshake failed.");
+
+ return;
+ }
+
+ // Notify job tracker about failed tasks.
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ if (proc != null) {
+ Collection<GridHadoopTaskInfo> tasks = proc.tasks();
+
+ if (!F.isEmpty(tasks)) {
+ log.warning("Lost connection with alive process (will terminate): " + desc);
+
+ GridHadoopTaskStatus status = new GridHadoopTaskStatus(CRASHED,
+ new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
+
+ for (GridHadoopTaskInfo info : tasks)
+ jobTracker.onTaskFinished(info, status);
+
+ runningProcsByJobId.remove(proc.jobId(), proc);
+ }
+
+ // Safety.
+ proc.terminate();
+ }
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ }
+
+ /**
+ * Hadoop process.
+ */
+ private static class HadoopProcess extends ReentrantLock {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ private final GridHadoopJobId jobId;
+
+ /** Process. */
+ private Process proc;
+
+ /** Init future. Completes when process is ready to receive messages. */
+ private final GridHadoopProcessFuture initFut;
+
+ /** Process descriptor. */
+ private GridHadoopProcessDescriptor procDesc;
+
+ /** Reducers planned for this process. */
+ private Collection<Integer> reducers;
+
+ /** Tasks. */
+ private final Collection<GridHadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
+
+ /** Terminated flag. */
+ private volatile boolean terminated;
+
+ /**
+ * @param jobId Job ID.
+ * @param initFut Init future.
+ */
+ private HadoopProcess(GridHadoopJobId jobId, GridHadoopProcessFuture initFut,
+ int[] reducers) {
+ this.jobId = jobId;
+ this.initFut = initFut;
+
+ if (!F.isEmpty(reducers)) {
+ this.reducers = new ArrayList<>(reducers.length);
+
+ for (int r : reducers)
+ this.reducers.add(r);
+ }
+ }
+
+ /**
+ * @return Communication process descriptor.
+ */
+ private GridHadoopProcessDescriptor descriptor() {
+ return procDesc;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public GridHadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * Initialized callback.
+ *
+ * @param proc Java process representation.
+ * @param procDesc Process descriptor.
+ */
+ private void onInitialized(Process proc, GridHadoopProcessDescriptor procDesc) {
+ this.proc = proc;
+ this.procDesc = procDesc;
+ }
+
+ /**
+ * Terminates process (kills it).
+ */
+ private void terminate() {
+ // Guard against concurrent message sending.
+ lock();
+
+ try {
+ terminated = true;
+
+ if (!initFut.isDone())
+ initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+ @Override public void apply(
+ IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+ proc.destroy();
+ }
+ });
+ else
+ proc.destroy();
+ }
+ finally {
+ unlock();
+ }
+ }
+
+ /**
+ * @return Terminated flag.
+ */
+ private boolean terminated() {
+ return terminated;
+ }
+
+ /**
+ * Sets process tasks.
+ *
+ * @param tasks Tasks to set.
+ */
+ private void addTasks(Collection<GridHadoopTaskInfo> tasks) {
+ this.tasks.addAll(tasks);
+ }
+
+ /**
+ * Removes task when it was completed.
+ *
+ * @param task Task to remove.
+ */
+ private void removeTask(GridHadoopTaskInfo task) {
+ if (tasks != null)
+ tasks.remove(task);
+ }
+
+ /**
+ * @return Collection of tasks.
+ */
+ private Collection<GridHadoopTaskInfo> tasks() {
+ return tasks;
+ }
+
+ /**
+ * @return Planned reducers.
+ */
+ private Collection<Integer> reducers() {
+ return reducers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcess.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Child process ID. */
+ private UUID childProcId;
+
+ /** Job ID. */
+ private GridHadoopJobId jobId;
+
+ /** Process descriptor. */
+ private GridHadoopProcessDescriptor desc;
+
+ /** Running process. */
+ private Process proc;
+
+ /** Process started flag. */
+ private volatile boolean procStarted;
+
+ /** Reply received flag. */
+ private volatile boolean replyReceived;
+
+ /** Logger. */
+ private final IgniteLogger log = HadoopExternalTaskExecutor.this.log;
+
+ /**
+ * Empty constructor.
+ */
+ public GridHadoopProcessFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Kernal context.
+ */
+ private GridHadoopProcessFuture(UUID childProcId, GridHadoopJobId jobId, GridKernalContext ctx) {
+ super(ctx);
+
+ this.childProcId = childProcId;
+ this.jobId = jobId;
+ }
+
+ /**
+ * Process started callback.
+ */
+ public void onProcessStarted(Process proc) {
+ this.proc = proc;
+
+ procStarted = true;
+
+ if (procStarted && replyReceived)
+ onDone(F.t(proc, desc));
+ }
+
+ /**
+ * Reply received callback.
+ */
+ public void onReplyReceived(GridHadoopProcessDescriptor desc) {
+ assert childProcId.equals(desc.processId());
+
+ this.desc = desc;
+
+ replyReceived = true;
+
+ if (procStarted && replyReceived)
+ onDone(F.t(proc, desc));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable IgniteBiTuple<Process, GridHadoopProcessDescriptor> res,
+ @Nullable Throwable err) {
+ if (err == null) {
+ HadoopProcess proc = runningProcsByProcId.get(childProcId);
+
+ assert proc != null;
+
+ assert proc.initFut == this;
+
+ proc.onInitialized(res.get1(), res.get2());
+
+ if (!F.isEmpty(proc.reducers()))
+ jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
+ }
+ else {
+ // Clean up since init failed.
+ runningProcsByJobId.remove(jobId);
+ runningProcsByProcId.remove(childProcId);
+ }
+
+ if (super.onDone(res, err)) {
+ if (err == null) {
+ if (log.isDebugEnabled())
+ log.debug("Initialized child process for external task execution [jobId=" + jobId +
+ ", desc=" + desc + ", initTime=" + duration() + ']');
+ }
+ else
+ U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
+ ", desc=" + desc + ']', err);
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
index 878b61b..da59483 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
@@ -86,7 +86,7 @@ public class GridHadoopV1MapTask extends GridHadoopV1Task {
try {
while (reader.next(key, val)) {
if (isCancelled())
- throw new GridHadoopTaskCancelledException("Map task cancelled.");
+ throw new HadoopTaskCancelledException("Map task cancelled.");
mapper.map(key, val, collector, reporter);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
index 7deea90..3aca637 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
@@ -67,7 +67,7 @@ public class GridHadoopV1ReduceTask extends GridHadoopV1Task {
try {
while (input.next()) {
if (isCancelled())
- throw new GridHadoopTaskCancelledException("Reduce task cancelled.");
+ throw new HadoopTaskCancelledException("Reduce task cancelled.");
reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
index 257f4ea..0e1fb44 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
@@ -58,7 +58,7 @@ public class GridHadoopV1Splitter {
res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
}
else
- res.add(GridHadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
+ res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
}
return res;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
index 86a7264..305bc4e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
@@ -72,7 +72,7 @@ public abstract class GridHadoopV1Task extends GridHadoopTask {
/** {@inheritDoc} */
@Override public void collect(Object key, Object val) throws IOException {
if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
+ throw new HadoopTaskCancelledException("Task cancelled.");
super.collect(key, val);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
index 287b10f..160e34b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
@@ -92,7 +92,7 @@ public class GridHadoopV2Context extends JobContextImpl implements MapContext, R
else if (split instanceof GridHadoopExternalSplit)
throw new UnsupportedOperationException(); // TODO
else if (split instanceof GridHadoopSplitWrapper)
- inputSplit = (InputSplit)GridHadoopUtils.unwrapSplit((GridHadoopSplitWrapper)split);
+ inputSplit = (InputSplit) HadoopUtils.unwrapSplit((GridHadoopSplitWrapper) split);
else
throw new IllegalStateException();
}
@@ -103,7 +103,7 @@ public class GridHadoopV2Context extends JobContextImpl implements MapContext, R
/** {@inheritDoc} */
@Override public boolean nextKeyValue() throws IOException, InterruptedException {
if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
+ throw new HadoopTaskCancelledException("Task cancelled.");
return reader.nextKeyValue();
}
@@ -125,7 +125,7 @@ public class GridHadoopV2Context extends JobContextImpl implements MapContext, R
@SuppressWarnings("unchecked")
@Override public void write(Object key, Object val) throws IOException, InterruptedException {
if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
+ throw new HadoopTaskCancelledException("Task cancelled.");
if (writer != null)
writer.write(key, val);
@@ -191,7 +191,7 @@ public class GridHadoopV2Context extends JobContextImpl implements MapContext, R
/** {@inheritDoc} */
@Override public boolean nextKey() throws IOException, InterruptedException {
if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
+ throw new HadoopTaskCancelledException("Task cancelled.");
return input.next();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
index 7c36948..5f1af22 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
@@ -39,7 +39,7 @@ import java.util.*;
import java.util.Queue;
import java.util.concurrent.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
/**
* Hadoop job implementation for v2 API.
@@ -81,7 +81,7 @@ public class GridHadoopV2Job implements GridHadoopJob {
* @param jobInfo Job info.
* @param log Logger.
*/
- public GridHadoopV2Job(GridHadoopJobId jobId, final GridHadoopDefaultJobInfo jobInfo, IgniteLogger log) {
+ public GridHadoopV2Job(GridHadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) {
assert jobId != null;
assert jobInfo != null;
@@ -90,7 +90,7 @@ public class GridHadoopV2Job implements GridHadoopJob {
hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
- GridHadoopClassLoader clsLdr = (GridHadoopClassLoader)getClass().getClassLoader();
+ HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
// Before create JobConf instance we should set new context class loader.
Thread.currentThread().setContextClassLoader(clsLdr);
@@ -196,7 +196,7 @@ public class GridHadoopV2Job implements GridHadoopJob {
try {
if (cls == null) {
// If there is no pooled class, then load new one.
- GridHadoopClassLoader ldr = new GridHadoopClassLoader(rsrcMgr.classPath());
+ HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath());
cls = ldr.loadClass(GridHadoopV2TaskContext.class.getName());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
index e8ce70b..68338a6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
@@ -59,7 +59,7 @@ public class GridHadoopV2Splitter {
res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
}
else
- res.add(GridHadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations()));
+ res.add(HadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations()));
id++;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
index 82be91f..3e88362 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
@@ -40,7 +40,7 @@ import java.io.*;
import java.util.*;
import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
/**
* Context for task execution.
@@ -186,7 +186,7 @@ public class GridHadoopV2TaskContext extends GridHadoopTaskContext {
}
if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
+ throw new HadoopTaskCancelledException("Task cancelled.");
try {
task.run(this);