You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/04 16:35:23 UTC
[21/45] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
new file mode 100644
index 0000000..04a96de
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -0,0 +1,960 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
+
+/**
+ * External process registry. Handles external process lifecycle.
+ */
+public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
+ /** Hadoop context. */
+ private HadoopContext ctx;
+
+ /** */
+ private String javaCmd;
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Node process descriptor. */
+ private HadoopProcessDescriptor nodeDesc;
+
+ /** Output base. */
+ private File outputBase;
+
+ /** Path separator. */
+ private String pathSep;
+
+ /** Hadoop external communication. */
+ private HadoopExternalCommunication comm;
+
+ /** Starting processes. */
+ private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
+
+ /** Starting processes. */
+ private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
+
+ /** Busy lock. */
+ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+
+ /** Job tracker. */
+ private HadoopJobTracker jobTracker;
+
+ /** {@inheritDoc} */
+ @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+ this.ctx = ctx;
+
+ log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class);
+
+ outputBase = U.resolveWorkDirectory("hadoop", false);
+
+ pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
+
+ initJavaCommand();
+
+ comm = new HadoopExternalCommunication(
+ ctx.localNodeId(),
+ UUID.randomUUID(),
+ ctx.kernalContext().config().getMarshaller(),
+ log,
+ ctx.kernalContext().getSystemExecutorService(),
+ ctx.kernalContext().gridName());
+
+ comm.setListener(new MessageListener());
+
+ comm.start();
+
+ nodeDesc = comm.localProcessDescriptor();
+
+ ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
+ HadoopExternalTaskExecutor.class);
+
+ if (nodeDesc.sharedMemoryPort() != -1)
+ ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
+ HadoopExternalTaskExecutor.class);
+
+ jobTracker = ctx.jobTracker();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ busyLock.writeLock();
+
+ try {
+ comm.stop();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onJobStateChanged(final HadoopJobMetadata meta) {
+ final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
+
+ // If we have a local process for this job.
+ if (proc != null) {
+ if (log.isDebugEnabled())
+ log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
+
+ if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
+ if (log.isDebugEnabled())
+ log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
+ ", proc=" + proc + ']');
+
+ runningProcsByJobId.remove(meta.jobId());
+ runningProcsByProcId.remove(proc.descriptor().processId());
+
+ proc.terminate();
+
+ return;
+ }
+
+ if (proc.initFut.isDone()) {
+ if (!proc.initFut.isFailed())
+ sendJobInfoUpdate(proc, meta);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to initialize child process (will skip job state notification) " +
+ "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
+ }
+ else {
+ proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+ try {
+ f.get();
+
+ sendJobInfoUpdate(proc, meta);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to initialize child process (will skip job state notification) " +
+ "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
+ }
+
+ }
+ });
+ }
+ }
+ else if (ctx.isParticipating(meta)) {
+ HadoopJob job;
+
+ try {
+ job = jobTracker.job(meta.jobId(), meta.jobInfo());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to get job: " + meta.jobId(), e);
+
+ return;
+ }
+
+ startProcess(job, meta.mapReducePlan());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
+ @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
+ if (!busyLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
+
+ return;
+ }
+
+ try {
+ HadoopProcess proc = runningProcsByJobId.get(job.id());
+
+ HadoopTaskType taskType = F.first(tasks).type();
+
+ if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT ||
+ taskType == HadoopTaskType.COMMIT) {
+ if (proc == null || proc.terminated()) {
+ runningProcsByJobId.remove(job.id(), proc);
+
+ // Start new process for ABORT task since previous processes were killed.
+ proc = startProcess(job, jobTracker.plan(job.id()));
+
+ if (log.isDebugEnabled())
+ log.debug("Starting new process for maintenance task [jobId=" + job.id() +
+ ", proc=" + proc + ", taskType=" + taskType + ']');
+ }
+ }
+ else
+ assert proc != null : "Missing started process for task execution request: " + job.id() +
+ ", tasks=" + tasks;
+
+ final HadoopProcess proc0 = proc;
+
+ proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(
+ IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ f.get();
+
+ proc0.addTasks(tasks);
+
+ if (log.isDebugEnabled())
+ log.debug("Sending task execution request to child process [jobId=" + job.id() +
+ ", proc=" + proc0 + ", tasks=" + tasks + ']');
+
+ sendExecutionRequest(proc0, job, tasks);
+ }
+ catch (IgniteCheckedException e) {
+ notifyTasksFailed(tasks, FAILED, e);
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ });
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancelTasks(HadoopJobId jobId) {
+ HadoopProcess proc = runningProcsByJobId.get(jobId);
+
+ if (proc != null)
+ proc.terminate();
+ }
+
+ /**
+ * Sends execution request to remote node.
+ *
+ * @param proc Process to send request to.
+ * @param job Job instance.
+ * @param tasks Collection of tasks to execute in started process.
+ */
+ private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks)
+ throws IgniteCheckedException {
+ // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
+ proc.lock();
+
+ try {
+ if (proc.terminated()) {
+ notifyTasksFailed(tasks, CRASHED, null);
+
+ return;
+ }
+
+ HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest();
+
+ req.jobId(job.id());
+ req.jobInfo(job.info());
+ req.tasks(tasks);
+
+ comm.sendMessage(proc.descriptor(), req);
+ }
+ finally {
+ proc.unlock();
+ }
+ }
+
+ /**
+ * @return External task metadata.
+ */
+ private HadoopExternalTaskMetadata buildTaskMeta() {
+ HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata();
+
+ meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
+ meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
+ "-DIGNITE_HOME=" + U.getIgniteHome()));
+
+ return meta;
+ }
+
+ /**
+ * @param tasks Tasks to notify about.
+ * @param state Fail state.
+ * @param e Optional error.
+ */
+ private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) {
+ HadoopTaskStatus fail = new HadoopTaskStatus(state, e);
+
+ for (HadoopTaskInfo task : tasks)
+ jobTracker.onTaskFinished(task, fail);
+ }
+
+ /**
+ * Starts process template that will be ready to execute Hadoop tasks.
+ *
+ * @param job Job instance.
+ * @param plan Map reduce plan.
+ */
+ private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) {
+ final UUID childProcId = UUID.randomUUID();
+
+ HadoopJobId jobId = job.id();
+
+ final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId, ctx.kernalContext());
+
+ final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
+
+ HadoopProcess old = runningProcsByJobId.put(jobId, proc);
+
+ assert old == null;
+
+ old = runningProcsByProcId.put(childProcId, proc);
+
+ assert old == null;
+
+ ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ if (!busyLock.tryReadLock()) {
+ fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
+
+ return;
+ }
+
+ try {
+ HadoopExternalTaskMetadata startMeta = buildTaskMeta();
+
+ if (log.isDebugEnabled())
+ log.debug("Created hadoop child process metadata for job [job=" + job +
+ ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
+
+ Process proc = startJavaProcess(childProcId, startMeta, job);
+
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+ String line;
+
+ // Read up all the process output.
+ while ((line = rdr.readLine()) != null) {
+ if (log.isDebugEnabled())
+ log.debug("Tracing process output: " + line);
+
+ if ("Started".equals(line)) {
+ // Process started successfully, it should not write anything more to the output stream.
+ if (log.isDebugEnabled())
+ log.debug("Successfully started child process [childProcId=" + childProcId +
+ ", meta=" + job + ']');
+
+ fut.onProcessStarted(proc);
+
+ break;
+ }
+ else if ("Failed".equals(line)) {
+ StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
+
+ while ((line = rdr.readLine()) != null)
+ sb.append(" ").append(line).append("\n");
+
+ // Cut last character.
+ sb.setLength(sb.length() - 1);
+
+ log.warning(sb.toString());
+
+ fut.onDone(new IgniteCheckedException(sb.toString()));
+
+ break;
+ }
+ }
+ }
+ catch (Throwable e) {
+ fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ }, true);
+
+ fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+ try {
+ // Make sure there were no exceptions.
+ f.get();
+
+ prepareForJob(proc, job, plan);
+ }
+ catch (IgniteCheckedException ignore) {
+ // Exception is printed in future's onDone() method.
+ }
+ }
+ });
+
+ return proc;
+ }
+
+ /**
+ * Checks that java local command is available.
+ *
+ * @throws IgniteCheckedException If initialization failed.
+ */
+ private void initJavaCommand() throws IgniteCheckedException {
+ String javaHome = System.getProperty("java.home");
+
+ if (javaHome == null)
+ javaHome = System.getenv("JAVA_HOME");
+
+ if (javaHome == null)
+ throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
+
+ javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
+
+ try {
+ Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
+
+ Collection<String> out = readProcessOutput(proc);
+
+ int res = proc.waitFor();
+
+ if (res != 0)
+ throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
+ "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
+
+ if (log.isInfoEnabled()) {
+ log.info("Will use java for external task execution: ");
+
+ for (String s : out)
+ log.info(" " + s);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to check java for external task execution.", e);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
+ }
+ }
+
+ /**
+ * Reads process output line-by-line.
+ *
+ * @param proc Process to read output.
+ * @return Read lines.
+ * @throws IOException If read failed.
+ */
+ private Collection<String> readProcessOutput(Process proc) throws IOException {
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+ Collection<String> res = new ArrayList<>();
+
+ String s;
+
+ while ((s = rdr.readLine()) != null)
+ res.add(s);
+
+ return res;
+ }
+
+ /**
+ * Builds process from metadata.
+ *
+ * @param childProcId Child process ID.
+ * @param startMeta Metadata.
+ * @param job Job.
+ * @return Started process.
+ */
+ private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta,
+ HadoopJob job) throws Exception {
+ String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
+
+ if (log.isDebugEnabled())
+ log.debug("Will write process log output to: " + outFldr);
+
+ List<String> cmd = new ArrayList<>();
+
+ File workDir = U.resolveWorkDirectory("", false);
+
+ cmd.add(javaCmd);
+ cmd.addAll(startMeta.jvmOptions());
+ cmd.add("-cp");
+ cmd.add(buildClasspath(startMeta.classpath()));
+ cmd.add(HadoopExternalProcessStarter.class.getName());
+ cmd.add("-cpid");
+ cmd.add(String.valueOf(childProcId));
+ cmd.add("-ppid");
+ cmd.add(String.valueOf(nodeDesc.processId()));
+ cmd.add("-nid");
+ cmd.add(String.valueOf(nodeDesc.parentNodeId()));
+ cmd.add("-addr");
+ cmd.add(nodeDesc.address());
+ cmd.add("-tport");
+ cmd.add(String.valueOf(nodeDesc.tcpPort()));
+ cmd.add("-sport");
+ cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
+ cmd.add("-out");
+ cmd.add(outFldr);
+ cmd.add("-wd");
+ cmd.add(workDir.getAbsolutePath());
+
+ return new ProcessBuilder(cmd)
+ .redirectErrorStream(true)
+ .directory(workDir)
+ .start();
+ }
+
+ /**
+ * Gets job work folder.
+ *
+ * @param jobId Job ID.
+ * @return Job work folder.
+ */
+ private String jobWorkFolder(HadoopJobId jobId) {
+ return outputBase + File.separator + "Job_" + jobId;
+ }
+
+ /**
+ * @param cp Classpath collection.
+ * @return Classpath string.
+ */
+ private String buildClasspath(Collection<String> cp) {
+ assert !cp.isEmpty();
+
+ StringBuilder sb = new StringBuilder();
+
+ for (String s : cp)
+ sb.append(s).append(pathSep);
+
+ sb.setLength(sb.length() - 1);
+
+ return sb.toString();
+ }
+
+ /**
+ * Sends job info update request to remote process.
+ *
+ * @param proc Process to send request to.
+ * @param meta Job metadata.
+ */
+ private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) {
+ Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
+
+ int rdcNum = meta.mapReducePlan().reducers();
+
+ HadoopProcessDescriptor[] addrs = null;
+
+ if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
+ addrs = new HadoopProcessDescriptor[rdcNum];
+
+ for (int i = 0; i < rdcNum; i++) {
+ HadoopProcessDescriptor desc = rdcAddrs.get(i);
+
+ assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
+
+ addrs[i] = desc;
+ }
+ }
+
+ try {
+ comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
+ }
+ catch (IgniteCheckedException e) {
+ if (!proc.terminated()) {
+ log.error("Failed to send job state update message to remote child process (will kill the process) " +
+ "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
+
+ proc.terminate();
+ }
+ }
+ }
+
+ /**
+ * Sends prepare request to remote process.
+ *
+ * @param proc Process to send request to.
+ * @param job Job.
+ * @param plan Map reduce plan.
+ */
+ private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) {
+ try {
+ comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(),
+ plan.reducers(), plan.reducers(ctx.localNodeId())));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
+ ", plan=" + plan + ']', e);
+
+ proc.terminate();
+ }
+ }
+
+ /**
+ * Processes task finished message.
+ *
+ * @param desc Remote process descriptor.
+ * @param taskMsg Task finished message.
+ */
+ private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) {
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ if (proc != null)
+ proc.removeTask(taskMsg.taskInfo());
+
+ jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
+ }
+
+ /**
+ *
+ */
+ private class MessageListener implements HadoopMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ if (msg instanceof HadoopProcessStartedAck) {
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ assert proc != null : "Missing child process for processId: " + desc;
+
+ HadoopProcessFuture fut = proc.initFut;
+
+ if (fut != null)
+ fut.onReplyReceived(desc);
+ // Safety.
+ else
+ log.warning("Failed to find process start future (will ignore): " + desc);
+ }
+ else if (msg instanceof HadoopTaskFinishedMessage) {
+ HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg;
+
+ processTaskFinishedMessage(desc, taskMsg);
+ }
+ else
+ log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ if (desc == null) {
+ U.warn(log, "Handshake failed.");
+
+ return;
+ }
+
+ // Notify job tracker about failed tasks.
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ if (proc != null) {
+ Collection<HadoopTaskInfo> tasks = proc.tasks();
+
+ if (!F.isEmpty(tasks)) {
+ log.warning("Lost connection with alive process (will terminate): " + desc);
+
+ HadoopTaskStatus status = new HadoopTaskStatus(CRASHED,
+ new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
+
+ for (HadoopTaskInfo info : tasks)
+ jobTracker.onTaskFinished(info, status);
+
+ runningProcsByJobId.remove(proc.jobId(), proc);
+ }
+
+ // Safety.
+ proc.terminate();
+ }
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ }
+
+ /**
+ * Hadoop process.
+ */
+ private static class HadoopProcess extends ReentrantLock {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ private final HadoopJobId jobId;
+
+ /** Process. */
+ private Process proc;
+
+ /** Init future. Completes when process is ready to receive messages. */
+ private final HadoopProcessFuture initFut;
+
+ /** Process descriptor. */
+ private HadoopProcessDescriptor procDesc;
+
+ /** Reducers planned for this process. */
+ private Collection<Integer> reducers;
+
+ /** Tasks. */
+ private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
+
+ /** Terminated flag. */
+ private volatile boolean terminated;
+
+ /**
+ * @param jobId Job ID.
+ * @param initFut Init future.
+ */
+ private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut,
+ int[] reducers) {
+ this.jobId = jobId;
+ this.initFut = initFut;
+
+ if (!F.isEmpty(reducers)) {
+ this.reducers = new ArrayList<>(reducers.length);
+
+ for (int r : reducers)
+ this.reducers.add(r);
+ }
+ }
+
+ /**
+ * @return Communication process descriptor.
+ */
+ private HadoopProcessDescriptor descriptor() {
+ return procDesc;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * Initialized callback.
+ *
+ * @param proc Java process representation.
+ * @param procDesc Process descriptor.
+ */
+ private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) {
+ this.proc = proc;
+ this.procDesc = procDesc;
+ }
+
+ /**
+ * Terminates process (kills it).
+ */
+ private void terminate() {
+ // Guard against concurrent message sending.
+ lock();
+
+ try {
+ terminated = true;
+
+ if (!initFut.isDone())
+ initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(
+ IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+ proc.destroy();
+ }
+ });
+ else
+ proc.destroy();
+ }
+ finally {
+ unlock();
+ }
+ }
+
+ /**
+ * @return Terminated flag.
+ */
+ private boolean terminated() {
+ return terminated;
+ }
+
+ /**
+ * Sets process tasks.
+ *
+ * @param tasks Tasks to set.
+ */
+ private void addTasks(Collection<HadoopTaskInfo> tasks) {
+ this.tasks.addAll(tasks);
+ }
+
+ /**
+ * Removes task when it was completed.
+ *
+ * @param task Task to remove.
+ */
+ private void removeTask(HadoopTaskInfo task) {
+ if (tasks != null)
+ tasks.remove(task);
+ }
+
+ /**
+ * @return Collection of tasks.
+ */
+ private Collection<HadoopTaskInfo> tasks() {
+ return tasks;
+ }
+
+ /**
+ * @return Planned reducers.
+ */
+ private Collection<Integer> reducers() {
+ return reducers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcess.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Child process ID. */
+ private UUID childProcId;
+
+ /** Job ID. */
+ private HadoopJobId jobId;
+
+ /** Process descriptor. */
+ private HadoopProcessDescriptor desc;
+
+ /** Running process. */
+ private Process proc;
+
+ /** Process started flag. */
+ private volatile boolean procStarted;
+
+ /** Reply received flag. */
+ private volatile boolean replyReceived;
+
+ /** Logger. */
+ private final IgniteLogger log = HadoopExternalTaskExecutor.this.log;
+
+ /**
+ * Empty constructor.
+ */
+ public HadoopProcessFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Kernal context.
+ */
+ private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId, GridKernalContext ctx) {
+ super(ctx);
+
+ this.childProcId = childProcId;
+ this.jobId = jobId;
+ }
+
+ /**
+ * Process started callback.
+ */
+ public void onProcessStarted(Process proc) {
+ this.proc = proc;
+
+ procStarted = true;
+
+ if (procStarted && replyReceived)
+ onDone(F.t(proc, desc));
+ }
+
+ /**
+ * Reply received callback.
+ */
+ public void onReplyReceived(HadoopProcessDescriptor desc) {
+ assert childProcId.equals(desc.processId());
+
+ this.desc = desc;
+
+ replyReceived = true;
+
+ if (procStarted && replyReceived)
+ onDone(F.t(proc, desc));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res,
+ @Nullable Throwable err) {
+ if (err == null) {
+ HadoopProcess proc = runningProcsByProcId.get(childProcId);
+
+ assert proc != null;
+
+ assert proc.initFut == this;
+
+ proc.onInitialized(res.get1(), res.get2());
+
+ if (!F.isEmpty(proc.reducers()))
+ jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
+ }
+ else {
+ // Clean up since init failed.
+ runningProcsByJobId.remove(jobId);
+ runningProcsByProcId.remove(childProcId);
+ }
+
+ if (super.onDone(res, err)) {
+ if (err == null) {
+ if (log.isDebugEnabled())
+ log.debug("Initialized child process for external task execution [jobId=" + jobId +
+ ", desc=" + desc + ", initTime=" + duration() + ']');
+ }
+ else
+ U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
+ ", desc=" + desc + ']', err);
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
new file mode 100644
index 0000000..f0acc9f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * External task metadata (classpath, JVM options) needed to start external process execution.
+ */
+public class HadoopExternalTaskMetadata {
+ /** Process classpath. */
+ private Collection<String> classpath;
+
+ /** JVM options. */
+ @GridToStringInclude
+ private Collection<String> jvmOpts;
+
+ /**
+ * @return JVM Options.
+ */
+ public Collection<String> jvmOptions() {
+ return jvmOpts;
+ }
+
+ /**
+ * @param jvmOpts JVM options.
+ */
+ public void jvmOptions(Collection<String> jvmOpts) {
+ this.jvmOpts = jvmOpts;
+ }
+
+ /**
+ * @return Classpath.
+ */
+ public Collection<String> classpath() {
+ return classpath;
+ }
+
+ /**
+ * @param classpath Classpath.
+ */
+ public void classpath(Collection<String> classpath) {
+ this.classpath = classpath;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopExternalTaskMetadata.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
new file mode 100644
index 0000000..25c9408
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Job info update request.
+ */
+public class HadoopJobInfoUpdateRequest implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /** Job phase. */
+ @GridToStringInclude
+ private HadoopJobPhase jobPhase;
+
+ /** Reducers addresses. */
+ @GridToStringInclude
+ private HadoopProcessDescriptor[] reducersAddrs;
+
+ /**
+ * Constructor required by {@link Externalizable}.
+ */
+ public HadoopJobInfoUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param jobId Job ID.
+ * @param jobPhase Job phase.
+ * @param reducersAddrs Reducers addresses.
+ */
+ public HadoopJobInfoUpdateRequest(HadoopJobId jobId, HadoopJobPhase jobPhase,
+ HadoopProcessDescriptor[] reducersAddrs) {
+ assert jobId != null;
+
+ this.jobId = jobId;
+ this.jobPhase = jobPhase;
+ this.reducersAddrs = reducersAddrs;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Job phase.
+ */
+ public HadoopJobPhase jobPhase() {
+ return jobPhase;
+ }
+
+ /**
+ * @return Reducers addresses.
+ */
+ public HadoopProcessDescriptor[] reducersAddresses() {
+ return reducersAddrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+
+ out.writeObject(jobPhase);
+ U.writeArray(out, reducersAddrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+ jobId.readExternal(in);
+
+ jobPhase = (HadoopJobPhase)in.readObject();
+ reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopJobInfoUpdateRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
new file mode 100644
index 0000000..df44dd7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Child process initialization request.
+ */
+public class HadoopPrepareForJobRequest implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /** Job info. */
+ @GridToStringInclude
+ private HadoopJobInfo jobInfo;
+
+ /** Total amount of reducers in the job. */
+ @GridToStringInclude
+ private int totalReducersCnt;
+
+ /** Reducers to be executed on current node. */
+ @GridToStringInclude
+ private int[] locReducers;
+
+ /**
+ * Constructor required by {@link Externalizable}.
+ */
+ public HadoopPrepareForJobRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param jobId Job ID.
+ * @param jobInfo Job info.
+ * @param totalReducersCnt Number of reducers in the job.
+ * @param locReducers Reducers to be executed on current node.
+ */
+ public HadoopPrepareForJobRequest(HadoopJobId jobId, HadoopJobInfo jobInfo, int totalReducersCnt,
+ int[] locReducers) {
+ assert jobId != null;
+
+ this.jobId = jobId;
+ this.jobInfo = jobInfo;
+ this.totalReducersCnt = totalReducersCnt;
+ this.locReducers = locReducers;
+ }
+
+ /**
+ * @return Job info.
+ */
+ public HadoopJobInfo jobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Reducers to be executed on current node.
+ */
+ public int[] localReducers() {
+ return locReducers;
+ }
+
+ /**
+ * @return Number of reducers in job.
+ */
+ public int totalReducerCount() {
+ return totalReducersCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+
+ out.writeObject(jobInfo);
+ out.writeInt(totalReducersCnt);
+
+ U.writeIntArray(out, locReducers);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+ jobId.readExternal(in);
+
+ jobInfo = (HadoopJobInfo)in.readObject();
+ totalReducersCnt = in.readInt();
+
+ locReducers = U.readIntArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopPrepareForJobRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
new file mode 100644
index 0000000..dea73c3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Process descriptor used to identify process for which task is running.
+ */
+public class HadoopProcessDescriptor implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Parent node ID. */
+ private UUID parentNodeId;
+
+ /** Process ID. */
+ private UUID procId;
+
+ /** Address. */
+ private String addr;
+
+ /** TCP port. */
+ private int tcpPort;
+
+ /** Shared memory port. */
+ private int shmemPort;
+
+ /**
+ * @param parentNodeId Parent node ID.
+ * @param procId Process ID.
+ */
+ public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) {
+ this.parentNodeId = parentNodeId;
+ this.procId = procId;
+ }
+
+ /**
+ * Gets process ID.
+ *
+ * @return Process ID.
+ */
+ public UUID processId() {
+ return procId;
+ }
+
+ /**
+ * Gets parent node ID.
+ *
+ * @return Parent node ID.
+ */
+ public UUID parentNodeId() {
+ return parentNodeId;
+ }
+
+ /**
+ * Gets host address.
+ *
+ * @return Host address.
+ */
+ public String address() {
+ return addr;
+ }
+
+ /**
+ * Sets host address.
+ *
+ * @param addr Host address.
+ */
+ public void address(String addr) {
+ this.addr = addr;
+ }
+
+ /**
+ * @return Shared memory port.
+ */
+ public int sharedMemoryPort() {
+ return shmemPort;
+ }
+
+ /**
+ * Sets shared memory port.
+ *
+ * @param shmemPort Shared memory port.
+ */
+ public void sharedMemoryPort(int shmemPort) {
+ this.shmemPort = shmemPort;
+ }
+
+ /**
+ * @return TCP port.
+ */
+ public int tcpPort() {
+ return tcpPort;
+ }
+
+ /**
+ * Sets TCP port.
+ *
+ * @param tcpPort TCP port.
+ */
+ public void tcpPort(int tcpPort) {
+ this.tcpPort = tcpPort;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof HadoopProcessDescriptor))
+ return false;
+
+ HadoopProcessDescriptor that = (HadoopProcessDescriptor)o;
+
+ return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = parentNodeId.hashCode();
+
+ result = 31 * result + procId.hashCode();
+
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcessDescriptor.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
new file mode 100644
index 0000000..49ff4bf
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Process started message.
+ */
+public class HadoopProcessStartedAck implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcessStartedAck.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
new file mode 100644
index 0000000..05e12ef
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Message sent from node to child process to start task(s) execution.
+ */
+public class HadoopTaskExecutionRequest implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /** Job info. */
+ @GridToStringInclude
+ private HadoopJobInfo jobInfo;
+
+ /** Mappers. */
+ @GridToStringInclude
+ private Collection<HadoopTaskInfo> tasks;
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @param jobId Job ID.
+ */
+ public void jobId(HadoopJobId jobId) {
+ this.jobId = jobId;
+ }
+
+ /**
+ * @return Jon info.
+ */
+ public HadoopJobInfo jobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * @param jobInfo Job info.
+ */
+ public void jobInfo(HadoopJobInfo jobInfo) {
+ this.jobInfo = jobInfo;
+ }
+
+ /**
+ * @return Tasks.
+ */
+ public Collection<HadoopTaskInfo> tasks() {
+ return tasks;
+ }
+
+ /**
+ * @param tasks Tasks.
+ */
+ public void tasks(Collection<HadoopTaskInfo> tasks) {
+ this.tasks = tasks;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopTaskExecutionRequest.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+
+ out.writeObject(jobInfo);
+ U.writeCollection(out, tasks);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+ jobId.readExternal(in);
+
+ jobInfo = (HadoopJobInfo)in.readObject();
+ tasks = U.readCollection(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
new file mode 100644
index 0000000..d3639c7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Task finished message. Sent when local task finishes execution.
+ */
+public class HadoopTaskFinishedMessage implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Finished task info. */
+ private HadoopTaskInfo taskInfo;
+
+ /** Task finish status. */
+ private HadoopTaskStatus status;
+
+ /**
+ * Constructor required by {@link Externalizable}.
+ */
+ public HadoopTaskFinishedMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param taskInfo Finished task info.
+ * @param status Task finish status.
+ */
+ public HadoopTaskFinishedMessage(HadoopTaskInfo taskInfo, HadoopTaskStatus status) {
+ assert taskInfo != null;
+ assert status != null;
+
+ this.taskInfo = taskInfo;
+ this.status = status;
+ }
+
+ /**
+ * @return Finished task info.
+ */
+ public HadoopTaskInfo taskInfo() {
+ return taskInfo;
+ }
+
+ /**
+ * @return Task finish status.
+ */
+ public HadoopTaskStatus status() {
+ return status;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopTaskFinishedMessage.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ taskInfo.writeExternal(out);
+ status.writeExternal(out);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ taskInfo = new HadoopTaskInfo();
+ taskInfo.readExternal(in);
+
+ status = new HadoopTaskStatus();
+ status.readExternal(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
deleted file mode 100644
index 2d00222..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
-
-/**
- * Hadoop process base.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public class GridHadoopChildProcessRunner {
- /** Node process descriptor. */
- private GridHadoopProcessDescriptor nodeDesc;
-
- /** Message processing executor service. */
- private ExecutorService msgExecSvc;
-
- /** Task executor service. */
- private GridHadoopExecutorService execSvc;
-
- /** */
- protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
- /** External communication. */
- private GridHadoopExternalCommunication comm;
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Init guard. */
- private final AtomicBoolean initGuard = new AtomicBoolean();
-
- /** Start time. */
- private long startTime;
-
- /** Init future. */
- private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>();
-
- /** Job instance. */
- private GridHadoopJob job;
-
- /** Number of uncompleted tasks. */
- private final AtomicInteger pendingTasks = new AtomicInteger();
-
- /** Shuffle job. */
- private GridHadoopShuffleJob<GridHadoopProcessDescriptor> shuffleJob;
-
- /** Concurrent mappers. */
- private int concMappers;
-
- /** Concurrent reducers. */
- private int concReducers;
-
- /**
- * Starts child process runner.
- */
- public void start(GridHadoopExternalCommunication comm, GridHadoopProcessDescriptor nodeDesc,
- ExecutorService msgExecSvc, IgniteLogger parentLog)
- throws IgniteCheckedException {
- this.comm = comm;
- this.nodeDesc = nodeDesc;
- this.msgExecSvc = msgExecSvc;
-
- comm.setListener(new MessageListener());
- log = parentLog.getLogger(GridHadoopChildProcessRunner.class);
-
- startTime = U.currentTimeMillis();
-
- // At this point node knows that this process has started.
- comm.sendMessage(this.nodeDesc, new GridHadoopProcessStartedAck());
- }
-
- /**
- * Initializes process for task execution.
- *
- * @param req Initialization request.
- */
- private void prepareProcess(GridHadoopPrepareForJobRequest req) {
- if (initGuard.compareAndSet(false, true)) {
- try {
- if (log.isDebugEnabled())
- log.debug("Initializing external hadoop task: " + req);
-
- assert job == null;
-
- job = req.jobInfo().createJob(req.jobId(), log);
-
- job.initialize(true, nodeDesc.processId());
-
- shuffleJob = new GridHadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
- req.totalReducerCount(), req.localReducers());
-
- initializeExecutors(req);
-
- if (log.isDebugEnabled())
- log.debug("External process initialized [initWaitTime=" +
- (U.currentTimeMillis() - startTime) + ']');
-
- initFut.onDone(null, null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to initialize process: " + req, e);
-
- initFut.onDone(e);
- }
- }
- else
- log.warning("Duplicate initialize process request received (will ignore): " + req);
- }
-
- /**
- * @param req Task execution request.
- */
- private void runTasks(final GridHadoopTaskExecutionRequest req) {
- if (!initFut.isDone() && log.isDebugEnabled())
- log.debug("Will wait for process initialization future completion: " + req);
-
- initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- try {
- // Make sure init was successful.
- f.get();
-
- boolean set = pendingTasks.compareAndSet(0, req.tasks().size());
-
- assert set;
-
- GridHadoopTaskInfo info = F.first(req.tasks());
-
- assert info != null;
-
- int size = info.type() == MAP ? concMappers : concReducers;
-
-// execSvc.setCorePoolSize(size);
-// execSvc.setMaximumPoolSize(size);
-
- if (log.isDebugEnabled())
- log.debug("Set executor service size for task type [type=" + info.type() +
- ", size=" + size + ']');
-
- for (GridHadoopTaskInfo taskInfo : req.tasks()) {
- if (log.isDebugEnabled())
- log.debug("Submitted task for external execution: " + taskInfo);
-
- execSvc.submit(new GridHadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) {
- @Override protected void onTaskFinished(GridHadoopTaskStatus status) {
- onTaskFinished0(this, status);
- }
-
- @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext ctx)
- throws IgniteCheckedException {
- return shuffleJob.input(ctx);
- }
-
- @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx)
- throws IgniteCheckedException {
- return shuffleJob.output(ctx);
- }
- });
- }
- }
- catch (IgniteCheckedException e) {
- for (GridHadoopTaskInfo info : req.tasks())
- notifyTaskFinished(info, new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false);
- }
- }
- });
- }
-
- /**
- * Creates executor services.
- *
- * @param req Init child process request.
- */
- private void initializeExecutors(GridHadoopPrepareForJobRequest req) {
- int cpus = Runtime.getRuntime().availableProcessors();
-//
-// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
-// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus);
-
- execSvc = new GridHadoopExecutorService(log, "", cpus * 2, 1024);
- }
-
- /**
- * Updates external process map so that shuffle can proceed with sending messages to reducers.
- *
- * @param req Update request.
- */
- private void updateTasks(final GridHadoopJobInfoUpdateRequest req) {
- initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> gridFut) {
- assert initGuard.get();
-
- assert req.jobId().equals(job.id());
-
- if (req.reducersAddresses() != null) {
- if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
- shuffleJob.startSending("external",
- new IgniteInClosure2X<GridHadoopProcessDescriptor, GridHadoopShuffleMessage>() {
- @Override public void applyx(GridHadoopProcessDescriptor dest,
- GridHadoopShuffleMessage msg) throws IgniteCheckedException {
- comm.sendMessage(dest, msg);
- }
- });
- }
- }
- }
- });
- }
-
- /**
- * Stops all executors and running tasks.
- */
- private void shutdown() {
- if (execSvc != null)
- execSvc.shutdown(5000);
-
- if (msgExecSvc != null)
- msgExecSvc.shutdownNow();
-
- try {
- job.dispose(true);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to dispose job.", e);
- }
- }
-
- /**
- * Notifies node about task finish.
- *
- * @param run Finished task runnable.
- * @param status Task status.
- */
- private void onTaskFinished0(GridHadoopRunnableTask run, GridHadoopTaskStatus status) {
- GridHadoopTaskInfo info = run.taskInfo();
-
- int pendingTasks0 = pendingTasks.decrementAndGet();
-
- if (log.isDebugEnabled())
- log.debug("Hadoop task execution finished [info=" + info
- + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() +
- ", pendingTasks=" + pendingTasks0 +
- ", err=" + status.failCause() + ']');
-
- assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported.";
-
- boolean flush = pendingTasks0 == 0 && info.type() == MAP;
-
- notifyTaskFinished(info, status, flush);
- }
-
- /**
- * @param taskInfo Finished task info.
- * @param status Task status.
- */
- private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final GridHadoopTaskStatus status,
- boolean flush) {
-
- final GridHadoopTaskState state = status.state();
- final Throwable err = status.failCause();
-
- if (!flush) {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state +
- ", err=" + err + ']');
-
- comm.sendMessage(nodeDesc, new GridHadoopTaskFinishedMessage(taskInfo, status));
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to send message to parent node (will terminate child process).", e);
-
- shutdown();
-
- terminate();
- }
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" +
- taskInfo + ", state=" + state + ", err=" + err + ']');
-
- final long start = U.currentTimeMillis();
-
- try {
- shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- long end = U.currentTimeMillis();
-
- if (log.isDebugEnabled())
- log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo +
- ", flushTime=" + (end - start) + ']');
-
- try {
- // Check for errors on shuffle.
- f.get();
-
- notifyTaskFinished(taskInfo, status, false);
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
- ", state=" + state + ", err=" + err + ']', e);
-
- notifyTaskFinished(taskInfo,
- new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false);
- }
- }
- });
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
- ", state=" + state + ", err=" + err + ']', e);
-
- notifyTaskFinished(taskInfo, new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false);
- }
- }
- }
-
- /**
- * Checks if message was received from parent node and prints warning if not.
- *
- * @param desc Sender process ID.
- * @param msg Received message.
- * @return {@code True} if received from parent node.
- */
- private boolean validateNodeMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
- if (!nodeDesc.processId().equals(desc.processId())) {
- log.warning("Received process control request from unknown process (will ignore) [desc=" + desc +
- ", msg=" + msg + ']');
-
- return false;
- }
-
- return true;
- }
-
- /**
- * Stops execution of this process.
- */
- private void terminate() {
- System.exit(1);
- }
-
- /**
- * Message listener.
- */
- private class MessageListener implements GridHadoopMessageListener {
- /** {@inheritDoc} */
- @Override public void onMessageReceived(final GridHadoopProcessDescriptor desc, final GridHadoopMessage msg) {
- if (msg instanceof GridHadoopTaskExecutionRequest) {
- if (validateNodeMessage(desc, msg))
- runTasks((GridHadoopTaskExecutionRequest)msg);
- }
- else if (msg instanceof GridHadoopJobInfoUpdateRequest) {
- if (validateNodeMessage(desc, msg))
- updateTasks((GridHadoopJobInfoUpdateRequest)msg);
- }
- else if (msg instanceof GridHadoopPrepareForJobRequest) {
- if (validateNodeMessage(desc, msg))
- prepareProcess((GridHadoopPrepareForJobRequest)msg);
- }
- else if (msg instanceof GridHadoopShuffleMessage) {
- if (log.isTraceEnabled())
- log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
-
- initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- try {
- GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg;
-
- shuffleJob.onShuffleMessage(m);
-
- comm.sendMessage(desc, new GridHadoopShuffleAck(m.id(), m.jobId()));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
- }
- }
- });
- }
- else if (msg instanceof GridHadoopShuffleAck) {
- if (log.isTraceEnabled())
- log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
-
- shuffleJob.onShuffleAck((GridHadoopShuffleAck)msg);
- }
- else
- log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
- }
-
- /** {@inheritDoc} */
- @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) {
- if (log.isDebugEnabled())
- log.debug("Lost connection with remote process: " + desc);
-
- if (desc == null)
- U.warn(log, "Handshake failed.");
- else if (desc.processId().equals(nodeDesc.processId())) {
- log.warning("Child process lost connection with parent node (will terminate child process).");
-
- shutdown();
-
- terminate();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
deleted file mode 100644
index 5aeeeee..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.logger.log4j.*;
-import org.apache.ignite.marshaller.optimized.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Hadoop external process base class.
- */
-public class GridHadoopExternalProcessStarter {
- /** Path to Log4j configuration file. */
- public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml";
-
- /** Arguments. */
- private Args args;
-
- /** System out. */
- private OutputStream out;
-
- /** System err. */
- private OutputStream err;
-
- /**
- * @param args Parsed arguments.
- */
- public GridHadoopExternalProcessStarter(Args args) {
- this.args = args;
- }
-
- /**
- * @param cmdArgs Process arguments.
- */
- public static void main(String[] cmdArgs) {
- try {
- Args args = arguments(cmdArgs);
-
- new GridHadoopExternalProcessStarter(args).run();
- }
- catch (Exception e) {
- System.err.println("Failed");
-
- System.err.println(e.getMessage());
-
- e.printStackTrace(System.err);
- }
- }
-
- /**
- *
- * @throws Exception
- */
- public void run() throws Exception {
- U.setWorkDirectory(args.workDir, U.getIgniteHome());
-
- File outputDir = outputDirectory();
-
- initializeStreams(outputDir);
-
- ExecutorService msgExecSvc = Executors.newFixedThreadPool(
- Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2));
-
- IgniteLogger log = logger(outputDir);
-
- GridHadoopExternalCommunication comm = new GridHadoopExternalCommunication(
- args.nodeId,
- args.childProcId,
- new OptimizedMarshaller(),
- log,
- msgExecSvc,
- "external"
- );
-
- comm.start();
-
- GridHadoopProcessDescriptor nodeDesc = new GridHadoopProcessDescriptor(args.nodeId, args.parentProcId);
- nodeDesc.address(args.addr);
- nodeDesc.tcpPort(args.tcpPort);
- nodeDesc.sharedMemoryPort(args.shmemPort);
-
- GridHadoopChildProcessRunner runner = new GridHadoopChildProcessRunner();
-
- runner.start(comm, nodeDesc, msgExecSvc, log);
-
- System.err.println("Started");
- System.err.flush();
-
- System.setOut(new PrintStream(out));
- System.setErr(new PrintStream(err));
- }
-
- /**
- * @param outputDir Directory for process output.
- * @throws Exception
- */
- private void initializeStreams(File outputDir) throws Exception {
- out = new FileOutputStream(new File(outputDir, args.childProcId + ".out"));
- err = new FileOutputStream(new File(outputDir, args.childProcId + ".err"));
- }
-
- /**
- * @return Path to output directory.
- * @throws IOException If failed.
- */
- private File outputDirectory() throws IOException {
- File f = new File(args.out);
-
- if (!f.exists()) {
- if (!f.mkdirs())
- throw new IOException("Failed to create output directory: " + args.out);
- }
- else {
- if (f.isFile())
- throw new IOException("Output directory is a file: " + args.out);
- }
-
- return f;
- }
-
- /**
- * @param outputDir Directory for process output.
- * @return Logger.
- */
- private IgniteLogger logger(final File outputDir) {
- final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG);
-
- Log4JLogger logger;
-
- try {
- logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true);
- }
- catch (IgniteCheckedException e) {
- System.err.println("Failed to create URL-based logger. Will use default one.");
-
- e.printStackTrace();
-
- logger = new Log4JLogger(true);
- }
-
- logger.updateFilePath(new IgniteClosure<String, String>() {
- @Override public String apply(String s) {
- return new File(outputDir, args.childProcId + ".log").getAbsolutePath();
- }
- });
-
- return logger;
- }
-
- /**
- * @param processArgs Process arguments.
- * @return Child process instance.
- */
- private static Args arguments(String[] processArgs) throws Exception {
- Args args = new Args();
-
- for (int i = 0; i < processArgs.length; i++) {
- String arg = processArgs[i];
-
- switch (arg) {
- case "-cpid": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing process ID for '-cpid' parameter");
-
- String procIdStr = processArgs[++i];
-
- args.childProcId = UUID.fromString(procIdStr);
-
- break;
- }
-
- case "-ppid": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing process ID for '-ppid' parameter");
-
- String procIdStr = processArgs[++i];
-
- args.parentProcId = UUID.fromString(procIdStr);
-
- break;
- }
-
- case "-nid": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing node ID for '-nid' parameter");
-
- String nodeIdStr = processArgs[++i];
-
- args.nodeId = UUID.fromString(nodeIdStr);
-
- break;
- }
-
- case "-addr": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing node address for '-addr' parameter");
-
- args.addr = processArgs[++i];
-
- break;
- }
-
- case "-tport": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing tcp port for '-tport' parameter");
-
- args.tcpPort = Integer.parseInt(processArgs[++i]);
-
- break;
- }
-
- case "-sport": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing shared memory port for '-sport' parameter");
-
- args.shmemPort = Integer.parseInt(processArgs[++i]);
-
- break;
- }
-
- case "-out": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing output folder name for '-out' parameter");
-
- args.out = processArgs[++i];
-
- break;
- }
-
- case "-wd": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing work folder name for '-wd' parameter");
-
- args.workDir = processArgs[++i];
-
- break;
- }
- }
- }
-
- return args;
- }
-
- /**
- * Execution arguments.
- */
- private static class Args {
- /** Process ID. */
- private UUID childProcId;
-
- /** Process ID. */
- private UUID parentProcId;
-
- /** Process ID. */
- private UUID nodeId;
-
- /** Node address. */
- private String addr;
-
- /** TCP port */
- private int tcpPort;
-
- /** Shmem port. */
- private int shmemPort = -1;
-
- /** Output folder. */
- private String out;
-
- /** Work directory. */
- private String workDir;
- }
-}