You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/03/05 10:05:50 UTC
[50/58] [abbrv] incubator-ignite git commit: Merge branch 'sprint-2'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into
ignite-futures-cleanup-1
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17ac3602/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
index 0000000,04a96de..4b749f3
mode 000000,100644..100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@@ -1,0 -1,960 +1,958 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.hadoop.*;
+ import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+ import org.apache.ignite.internal.processors.hadoop.message.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.spi.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.locks.*;
+
+ import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
+
+ /**
+ * External process registry. Handles external process lifecycle.
+ */
+ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
+ /** Hadoop context. */
+ private HadoopContext ctx;
+
+ /** */
+ private String javaCmd;
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Node process descriptor. */
+ private HadoopProcessDescriptor nodeDesc;
+
+ /** Output base. */
+ private File outputBase;
+
+ /** Path separator. */
+ private String pathSep;
+
+ /** Hadoop external communication. */
+ private HadoopExternalCommunication comm;
+
+ /** Starting processes. */
+ private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
+
+ /** Starting processes. */
+ private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
+
+ /** Busy lock. */
+ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+
+ /** Job tracker. */
+ private HadoopJobTracker jobTracker;
+
+ /** {@inheritDoc} */
+ @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+ this.ctx = ctx;
+
+ log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class);
+
+ outputBase = U.resolveWorkDirectory("hadoop", false);
+
+ pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
+
+ initJavaCommand();
+
+ comm = new HadoopExternalCommunication(
+ ctx.localNodeId(),
+ UUID.randomUUID(),
+ ctx.kernalContext().config().getMarshaller(),
+ log,
+ ctx.kernalContext().getSystemExecutorService(),
+ ctx.kernalContext().gridName());
+
+ comm.setListener(new MessageListener());
+
+ comm.start();
+
+ nodeDesc = comm.localProcessDescriptor();
+
+ ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
+ HadoopExternalTaskExecutor.class);
+
+ if (nodeDesc.sharedMemoryPort() != -1)
+ ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
+ HadoopExternalTaskExecutor.class);
+
+ jobTracker = ctx.jobTracker();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ busyLock.writeLock();
+
+ try {
+ comm.stop();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onJobStateChanged(final HadoopJobMetadata meta) {
+ final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
+
+ // If we have a local process for this job.
+ if (proc != null) {
+ if (log.isDebugEnabled())
+ log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
+
+ if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
+ if (log.isDebugEnabled())
+ log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
+ ", proc=" + proc + ']');
+
+ runningProcsByJobId.remove(meta.jobId());
+ runningProcsByProcId.remove(proc.descriptor().processId());
+
+ proc.terminate();
+
+ return;
+ }
+
+ if (proc.initFut.isDone()) {
+ if (!proc.initFut.isFailed())
+ sendJobInfoUpdate(proc, meta);
+ else if (log.isDebugEnabled())
+ log.debug("Failed to initialize child process (will skip job state notification) " +
+ "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
+ }
+ else {
+ proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+ try {
+ f.get();
+
+ sendJobInfoUpdate(proc, meta);
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to initialize child process (will skip job state notification) " +
+ "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
+ }
+
+ }
+ });
+ }
+ }
+ else if (ctx.isParticipating(meta)) {
+ HadoopJob job;
+
+ try {
+ job = jobTracker.job(meta.jobId(), meta.jobInfo());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to get job: " + meta.jobId(), e);
+
+ return;
+ }
+
+ startProcess(job, meta.mapReducePlan());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
+ @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
+ if (!busyLock.tryReadLock()) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
+
+ return;
+ }
+
+ try {
+ HadoopProcess proc = runningProcsByJobId.get(job.id());
+
+ HadoopTaskType taskType = F.first(tasks).type();
+
+ if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT ||
+ taskType == HadoopTaskType.COMMIT) {
+ if (proc == null || proc.terminated()) {
+ runningProcsByJobId.remove(job.id(), proc);
+
+ // Start new process for ABORT task since previous processes were killed.
+ proc = startProcess(job, jobTracker.plan(job.id()));
+
+ if (log.isDebugEnabled())
+ log.debug("Starting new process for maintenance task [jobId=" + job.id() +
+ ", proc=" + proc + ", taskType=" + taskType + ']');
+ }
+ }
+ else
+ assert proc != null : "Missing started process for task execution request: " + job.id() +
+ ", tasks=" + tasks;
+
+ final HadoopProcess proc0 = proc;
+
+ proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(
+ IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ f.get();
+
+ proc0.addTasks(tasks);
+
+ if (log.isDebugEnabled())
+ log.debug("Sending task execution request to child process [jobId=" + job.id() +
+ ", proc=" + proc0 + ", tasks=" + tasks + ']');
+
+ sendExecutionRequest(proc0, job, tasks);
+ }
+ catch (IgniteCheckedException e) {
+ notifyTasksFailed(tasks, FAILED, e);
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ });
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancelTasks(HadoopJobId jobId) {
+ HadoopProcess proc = runningProcsByJobId.get(jobId);
+
+ if (proc != null)
+ proc.terminate();
+ }
+
+ /**
+ * Sends execution request to remote node.
+ *
+ * @param proc Process to send request to.
+ * @param job Job instance.
+ * @param tasks Collection of tasks to execute in started process.
+ */
+ private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks)
+ throws IgniteCheckedException {
+ // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
+ proc.lock();
+
+ try {
+ if (proc.terminated()) {
+ notifyTasksFailed(tasks, CRASHED, null);
+
+ return;
+ }
+
+ HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest();
+
+ req.jobId(job.id());
+ req.jobInfo(job.info());
+ req.tasks(tasks);
+
+ comm.sendMessage(proc.descriptor(), req);
+ }
+ finally {
+ proc.unlock();
+ }
+ }
+
+ /**
+ * @return External task metadata.
+ */
+ private HadoopExternalTaskMetadata buildTaskMeta() {
+ HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata();
+
+ meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
+ meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
+ "-DIGNITE_HOME=" + U.getIgniteHome()));
+
+ return meta;
+ }
+
+ /**
+ * @param tasks Tasks to notify about.
+ * @param state Fail state.
+ * @param e Optional error.
+ */
+ private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) {
+ HadoopTaskStatus fail = new HadoopTaskStatus(state, e);
+
+ for (HadoopTaskInfo task : tasks)
+ jobTracker.onTaskFinished(task, fail);
+ }
+
+ /**
+ * Starts process template that will be ready to execute Hadoop tasks.
+ *
+ * @param job Job instance.
+ * @param plan Map reduce plan.
+ */
+ private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) {
+ final UUID childProcId = UUID.randomUUID();
+
+ HadoopJobId jobId = job.id();
+
+ final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId, ctx.kernalContext());
+
+ final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
+
+ HadoopProcess old = runningProcsByJobId.put(jobId, proc);
+
+ assert old == null;
+
+ old = runningProcsByProcId.put(childProcId, proc);
+
+ assert old == null;
+
+ ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ if (!busyLock.tryReadLock()) {
+ fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
+
+ return;
+ }
+
+ try {
+ HadoopExternalTaskMetadata startMeta = buildTaskMeta();
+
+ if (log.isDebugEnabled())
+ log.debug("Created hadoop child process metadata for job [job=" + job +
+ ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
+
+ Process proc = startJavaProcess(childProcId, startMeta, job);
+
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+ String line;
+
+ // Read up all the process output.
+ while ((line = rdr.readLine()) != null) {
+ if (log.isDebugEnabled())
+ log.debug("Tracing process output: " + line);
+
+ if ("Started".equals(line)) {
+ // Process started successfully, it should not write anything more to the output stream.
+ if (log.isDebugEnabled())
+ log.debug("Successfully started child process [childProcId=" + childProcId +
+ ", meta=" + job + ']');
+
+ fut.onProcessStarted(proc);
+
+ break;
+ }
+ else if ("Failed".equals(line)) {
+ StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
+
+ while ((line = rdr.readLine()) != null)
+ sb.append(" ").append(line).append("\n");
+
+ // Cut last character.
+ sb.setLength(sb.length() - 1);
+
+ log.warning(sb.toString());
+
+ fut.onDone(new IgniteCheckedException(sb.toString()));
+
+ break;
+ }
+ }
+ }
+ catch (Throwable e) {
+ fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ }, true);
+
+ fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+ try {
+ // Make sure there were no exceptions.
+ f.get();
+
+ prepareForJob(proc, job, plan);
+ }
+ catch (IgniteCheckedException ignore) {
+ // Exception is printed in future's onDone() method.
+ }
+ }
+ });
+
+ return proc;
+ }
+
+ /**
+ * Checks that java local command is available.
+ *
+ * @throws IgniteCheckedException If initialization failed.
+ */
+ private void initJavaCommand() throws IgniteCheckedException {
+ String javaHome = System.getProperty("java.home");
+
+ if (javaHome == null)
+ javaHome = System.getenv("JAVA_HOME");
+
+ if (javaHome == null)
+ throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
+
+ javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
+
+ try {
+ Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
+
+ Collection<String> out = readProcessOutput(proc);
+
+ int res = proc.waitFor();
+
+ if (res != 0)
+ throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
+ "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
+
+ if (log.isInfoEnabled()) {
+ log.info("Will use java for external task execution: ");
+
+ for (String s : out)
+ log.info(" " + s);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to check java for external task execution.", e);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
+ }
+ }
+
+ /**
+ * Reads process output line-by-line.
+ *
+ * @param proc Process to read output.
+ * @return Read lines.
+ * @throws IOException If read failed.
+ */
+ private Collection<String> readProcessOutput(Process proc) throws IOException {
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+ Collection<String> res = new ArrayList<>();
+
+ String s;
+
+ while ((s = rdr.readLine()) != null)
+ res.add(s);
+
+ return res;
+ }
+
+ /**
+ * Builds process from metadata.
+ *
+ * @param childProcId Child process ID.
+ * @param startMeta Metadata.
+ * @param job Job.
+ * @return Started process.
+ */
+ private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta,
+ HadoopJob job) throws Exception {
+ String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
+
+ if (log.isDebugEnabled())
+ log.debug("Will write process log output to: " + outFldr);
+
+ List<String> cmd = new ArrayList<>();
+
+ File workDir = U.resolveWorkDirectory("", false);
+
+ cmd.add(javaCmd);
+ cmd.addAll(startMeta.jvmOptions());
+ cmd.add("-cp");
+ cmd.add(buildClasspath(startMeta.classpath()));
+ cmd.add(HadoopExternalProcessStarter.class.getName());
+ cmd.add("-cpid");
+ cmd.add(String.valueOf(childProcId));
+ cmd.add("-ppid");
+ cmd.add(String.valueOf(nodeDesc.processId()));
+ cmd.add("-nid");
+ cmd.add(String.valueOf(nodeDesc.parentNodeId()));
+ cmd.add("-addr");
+ cmd.add(nodeDesc.address());
+ cmd.add("-tport");
+ cmd.add(String.valueOf(nodeDesc.tcpPort()));
+ cmd.add("-sport");
+ cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
+ cmd.add("-out");
+ cmd.add(outFldr);
+ cmd.add("-wd");
+ cmd.add(workDir.getAbsolutePath());
+
+ return new ProcessBuilder(cmd)
+ .redirectErrorStream(true)
+ .directory(workDir)
+ .start();
+ }
+
+ /**
+ * Gets job work folder.
+ *
+ * @param jobId Job ID.
+ * @return Job work folder.
+ */
+ private String jobWorkFolder(HadoopJobId jobId) {
+ return outputBase + File.separator + "Job_" + jobId;
+ }
+
+ /**
+ * @param cp Classpath collection.
+ * @return Classpath string.
+ */
+ private String buildClasspath(Collection<String> cp) {
+ assert !cp.isEmpty();
+
+ StringBuilder sb = new StringBuilder();
+
+ for (String s : cp)
+ sb.append(s).append(pathSep);
+
+ sb.setLength(sb.length() - 1);
+
+ return sb.toString();
+ }
+
+ /**
+ * Sends job info update request to remote process.
+ *
+ * @param proc Process to send request to.
+ * @param meta Job metadata.
+ */
+ private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) {
+ Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
+
+ int rdcNum = meta.mapReducePlan().reducers();
+
+ HadoopProcessDescriptor[] addrs = null;
+
+ if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
+ addrs = new HadoopProcessDescriptor[rdcNum];
+
+ for (int i = 0; i < rdcNum; i++) {
+ HadoopProcessDescriptor desc = rdcAddrs.get(i);
+
+ assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
+
+ addrs[i] = desc;
+ }
+ }
+
+ try {
+ comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
+ }
+ catch (IgniteCheckedException e) {
+ if (!proc.terminated()) {
+ log.error("Failed to send job state update message to remote child process (will kill the process) " +
+ "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
+
+ proc.terminate();
+ }
+ }
+ }
+
+ /**
+ * Sends prepare request to remote process.
+ *
+ * @param proc Process to send request to.
+ * @param job Job.
+ * @param plan Map reduce plan.
+ */
+ private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) {
+ try {
+ comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(),
+ plan.reducers(), plan.reducers(ctx.localNodeId())));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
+ ", plan=" + plan + ']', e);
+
+ proc.terminate();
+ }
+ }
+
+ /**
+ * Processes task finished message.
+ *
+ * @param desc Remote process descriptor.
+ * @param taskMsg Task finished message.
+ */
+ private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) {
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ if (proc != null)
+ proc.removeTask(taskMsg.taskInfo());
+
+ jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
+ }
+
+ /**
+ *
+ */
+ private class MessageListener implements HadoopMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ if (msg instanceof HadoopProcessStartedAck) {
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ assert proc != null : "Missing child process for processId: " + desc;
+
+ HadoopProcessFuture fut = proc.initFut;
+
+ if (fut != null)
+ fut.onReplyReceived(desc);
+ // Safety.
+ else
+ log.warning("Failed to find process start future (will ignore): " + desc);
+ }
+ else if (msg instanceof HadoopTaskFinishedMessage) {
+ HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg;
+
+ processTaskFinishedMessage(desc, taskMsg);
+ }
+ else
+ log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ if (desc == null) {
+ U.warn(log, "Handshake failed.");
+
+ return;
+ }
+
+ // Notify job tracker about failed tasks.
+ HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+ if (proc != null) {
+ Collection<HadoopTaskInfo> tasks = proc.tasks();
+
+ if (!F.isEmpty(tasks)) {
+ log.warning("Lost connection with alive process (will terminate): " + desc);
+
+ HadoopTaskStatus status = new HadoopTaskStatus(CRASHED,
+ new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
+
+ for (HadoopTaskInfo info : tasks)
+ jobTracker.onTaskFinished(info, status);
+
+ runningProcsByJobId.remove(proc.jobId(), proc);
+ }
+
+ // Safety.
+ proc.terminate();
+ }
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ }
+
+ /**
+ * Hadoop process.
+ */
+ private static class HadoopProcess extends ReentrantLock {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ private final HadoopJobId jobId;
+
+ /** Process. */
+ private Process proc;
+
+ /** Init future. Completes when process is ready to receive messages. */
+ private final HadoopProcessFuture initFut;
+
+ /** Process descriptor. */
+ private HadoopProcessDescriptor procDesc;
+
+ /** Reducers planned for this process. */
+ private Collection<Integer> reducers;
+
+ /** Tasks. */
+ private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
+
+ /** Terminated flag. */
+ private volatile boolean terminated;
+
+ /**
+ * @param jobId Job ID.
+ * @param initFut Init future.
+ */
+ private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut,
+ int[] reducers) {
+ this.jobId = jobId;
+ this.initFut = initFut;
+
+ if (!F.isEmpty(reducers)) {
+ this.reducers = new ArrayList<>(reducers.length);
+
+ for (int r : reducers)
+ this.reducers.add(r);
+ }
+ }
+
+ /**
+ * @return Communication process descriptor.
+ */
+ private HadoopProcessDescriptor descriptor() {
+ return procDesc;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * Initialized callback.
+ *
+ * @param proc Java process representation.
+ * @param procDesc Process descriptor.
+ */
+ private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) {
+ this.proc = proc;
+ this.procDesc = procDesc;
+ }
+
+ /**
+ * Terminates process (kills it).
+ */
+ private void terminate() {
+ // Guard against concurrent message sending.
+ lock();
+
+ try {
+ terminated = true;
+
+ if (!initFut.isDone())
+ initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(
+ IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+ proc.destroy();
+ }
+ });
+ else
+ proc.destroy();
+ }
+ finally {
+ unlock();
+ }
+ }
+
+ /**
+ * @return Terminated flag.
+ */
+ private boolean terminated() {
+ return terminated;
+ }
+
+ /**
+ * Sets process tasks.
+ *
+ * @param tasks Tasks to set.
+ */
+ private void addTasks(Collection<HadoopTaskInfo> tasks) {
+ this.tasks.addAll(tasks);
+ }
+
+ /**
+ * Removes task when it was completed.
+ *
+ * @param task Task to remove.
+ */
+ private void removeTask(HadoopTaskInfo task) {
+ if (tasks != null)
+ tasks.remove(task);
+ }
+
+ /**
+ * @return Collection of tasks.
+ */
+ private Collection<HadoopTaskInfo> tasks() {
+ return tasks;
+ }
+
+ /**
+ * @return Planned reducers.
+ */
+ private Collection<Integer> reducers() {
+ return reducers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcess.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Child process ID. */
+ private UUID childProcId;
+
+ /** Job ID. */
+ private HadoopJobId jobId;
+
+ /** Process descriptor. */
+ private HadoopProcessDescriptor desc;
+
+ /** Running process. */
+ private Process proc;
+
+ /** Process started flag. */
+ private volatile boolean procStarted;
+
+ /** Reply received flag. */
+ private volatile boolean replyReceived;
+
+ /** Logger. */
+ private final IgniteLogger log = HadoopExternalTaskExecutor.this.log;
+
+ /**
+ * Empty constructor.
+ */
+ public HadoopProcessFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Kernal context.
+ */
- private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId, GridKernalContext ctx) {
- super(ctx);
-
++ private GridHadoopProcessFuture(UUID childProcId, HadoopJobId jobId, GridKernalContext ctx) {
+ this.childProcId = childProcId;
+ this.jobId = jobId;
+ }
+
+ /**
+ * Process started callback.
+ */
+ public void onProcessStarted(Process proc) {
+ this.proc = proc;
+
+ procStarted = true;
+
+ if (procStarted && replyReceived)
+ onDone(F.t(proc, desc));
+ }
+
+ /**
+ * Reply received callback.
+ */
+ public void onReplyReceived(HadoopProcessDescriptor desc) {
+ assert childProcId.equals(desc.processId());
+
+ this.desc = desc;
+
+ replyReceived = true;
+
+ if (procStarted && replyReceived)
+ onDone(F.t(proc, desc));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res,
+ @Nullable Throwable err) {
+ if (err == null) {
+ HadoopProcess proc = runningProcsByProcId.get(childProcId);
+
+ assert proc != null;
+
+ assert proc.initFut == this;
+
+ proc.onInitialized(res.get1(), res.get2());
+
+ if (!F.isEmpty(proc.reducers()))
+ jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
+ }
+ else {
+ // Clean up since init failed.
+ runningProcsByJobId.remove(jobId);
+ runningProcsByProcId.remove(childProcId);
+ }
+
+ if (super.onDone(res, err)) {
+ if (err == null) {
+ if (log.isDebugEnabled())
+ log.debug("Initialized child process for external task execution [jobId=" + jobId +
+ ", desc=" + desc + ", initTime=" + duration() + ']');
+ }
+ else
+ U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
+ ", desc=" + desc + ']', err);
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17ac3602/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index 0000000,e95b8cb..831885f
mode 000000,100644..100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@@ -1,0 -1,440 +1,440 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.hadoop.*;
+ import org.apache.ignite.internal.processors.hadoop.message.*;
+ import org.apache.ignite.internal.processors.hadoop.shuffle.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.offheap.unsafe.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+
+ import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*;
+
+ /**
+ * Hadoop process base.
+ */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ public class HadoopChildProcessRunner {
+ /** Node process descriptor. */
+ private HadoopProcessDescriptor nodeDesc;
+
+ /** Message processing executor service. */
+ private ExecutorService msgExecSvc;
+
+ /** Task executor service. */
+ private HadoopExecutorService execSvc;
+
+ /** */
+ protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ /** External communication. */
+ private HadoopExternalCommunication comm;
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Init guard. */
+ private final AtomicBoolean initGuard = new AtomicBoolean();
+
+ /** Start time. */
+ private long startTime;
+
+ /** Init future. */
- private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>();
++ private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>();
+
+ /** Job instance. */
+ private HadoopJob job;
+
+ /** Number of uncompleted tasks. */
+ private final AtomicInteger pendingTasks = new AtomicInteger();
+
+ /** Shuffle job. */
+ private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob;
+
+ /** Concurrent mappers. */
+ private int concMappers;
+
+ /** Concurrent reducers. */
+ private int concReducers;
+
+ /**
+ * Starts child process runner.
+ */
+ public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc,
+ ExecutorService msgExecSvc, IgniteLogger parentLog)
+ throws IgniteCheckedException {
+ this.comm = comm;
+ this.nodeDesc = nodeDesc;
+ this.msgExecSvc = msgExecSvc;
+
+ comm.setListener(new MessageListener());
+ log = parentLog.getLogger(HadoopChildProcessRunner.class);
+
+ startTime = U.currentTimeMillis();
+
+ // At this point node knows that this process has started.
+ comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck());
+ }
+
+ /**
+ * Initializes process for task execution.
+ *
+ * @param req Initialization request.
+ */
+ private void prepareProcess(HadoopPrepareForJobRequest req) {
+ if (initGuard.compareAndSet(false, true)) {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Initializing external hadoop task: " + req);
+
+ assert job == null;
+
+ job = req.jobInfo().createJob(req.jobId(), log);
+
+ job.initialize(true, nodeDesc.processId());
+
+ shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
+ req.totalReducerCount(), req.localReducers());
+
+ initializeExecutors(req);
+
+ if (log.isDebugEnabled())
+ log.debug("External process initialized [initWaitTime=" +
+ (U.currentTimeMillis() - startTime) + ']');
+
+ initFut.onDone(null, null);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to initialize process: " + req, e);
+
+ initFut.onDone(e);
+ }
+ }
+ else
+ log.warning("Duplicate initialize process request received (will ignore): " + req);
+ }
+
+ /**
+ * @param req Task execution request.
+ */
+ private void runTasks(final HadoopTaskExecutionRequest req) {
+ if (!initFut.isDone() && log.isDebugEnabled())
+ log.debug("Will wait for process initialization future completion: " + req);
+
+ initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ try {
+ // Make sure init was successful.
+ f.get();
+
+ boolean set = pendingTasks.compareAndSet(0, req.tasks().size());
+
+ assert set;
+
+ HadoopTaskInfo info = F.first(req.tasks());
+
+ assert info != null;
+
+ int size = info.type() == MAP ? concMappers : concReducers;
+
+ // execSvc.setCorePoolSize(size);
+ // execSvc.setMaximumPoolSize(size);
+
+ if (log.isDebugEnabled())
+ log.debug("Set executor service size for task type [type=" + info.type() +
+ ", size=" + size + ']');
+
+ for (HadoopTaskInfo taskInfo : req.tasks()) {
+ if (log.isDebugEnabled())
+ log.debug("Submitted task for external execution: " + taskInfo);
+
+ execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) {
+ @Override protected void onTaskFinished(HadoopTaskStatus status) {
+ onTaskFinished0(this, status);
+ }
+
+ @Override protected HadoopTaskInput createInput(HadoopTaskContext ctx)
+ throws IgniteCheckedException {
+ return shuffleJob.input(ctx);
+ }
+
+ @Override protected HadoopTaskOutput createOutput(HadoopTaskContext ctx)
+ throws IgniteCheckedException {
+ return shuffleJob.output(ctx);
+ }
+ });
+ }
+ }
+ catch (IgniteCheckedException e) {
+ for (HadoopTaskInfo info : req.tasks())
+ notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+ }
+ }
+ });
+ }
+
+ /**
+ * Creates executor services.
+ *
+ * @param req Init child process request.
+ */
+ private void initializeExecutors(HadoopPrepareForJobRequest req) {
+ int cpus = Runtime.getRuntime().availableProcessors();
+ //
+ // concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
+ // concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus);
+
+ execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024);
+ }
+
+ /**
+ * Updates external process map so that shuffle can proceed with sending messages to reducers.
+ *
+ * @param req Update request.
+ */
+ private void updateTasks(final HadoopJobInfoUpdateRequest req) {
+ initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> gridFut) {
+ assert initGuard.get();
+
+ assert req.jobId().equals(job.id());
+
+ if (req.reducersAddresses() != null) {
+ if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
+ shuffleJob.startSending("external",
+ new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
+ @Override public void applyx(HadoopProcessDescriptor dest,
+ HadoopShuffleMessage msg) throws IgniteCheckedException {
+ comm.sendMessage(dest, msg);
+ }
+ });
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Stops all executors and running tasks.
+ */
+ private void shutdown() {
+ if (execSvc != null)
+ execSvc.shutdown(5000);
+
+ if (msgExecSvc != null)
+ msgExecSvc.shutdownNow();
+
+ try {
+ job.dispose(true);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to dispose job.", e);
+ }
+ }
+
+ /**
+ * Notifies node about task finish.
+ *
+ * @param run Finished task runnable.
+ * @param status Task status.
+ */
+ private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) {
+ HadoopTaskInfo info = run.taskInfo();
+
+ int pendingTasks0 = pendingTasks.decrementAndGet();
+
+ if (log.isDebugEnabled())
+ log.debug("Hadoop task execution finished [info=" + info
+ + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() +
+ ", pendingTasks=" + pendingTasks0 +
+ ", err=" + status.failCause() + ']');
+
+ assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported.";
+
+ boolean flush = pendingTasks0 == 0 && info.type() == MAP;
+
+ notifyTaskFinished(info, status, flush);
+ }
+
+ /**
+ * @param taskInfo Finished task info.
+ * @param status Task status.
+ */
+ private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final HadoopTaskStatus status,
+ boolean flush) {
+
+ final HadoopTaskState state = status.state();
+ final Throwable err = status.failCause();
+
+ if (!flush) {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state +
+ ", err=" + err + ']');
+
+ comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status));
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to send message to parent node (will terminate child process).", e);
+
+ shutdown();
+
+ terminate();
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" +
+ taskInfo + ", state=" + state + ", err=" + err + ']');
+
+ final long start = U.currentTimeMillis();
+
+ try {
+ shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ long end = U.currentTimeMillis();
+
+ if (log.isDebugEnabled())
+ log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo +
+ ", flushTime=" + (end - start) + ']');
+
+ try {
+ // Check for errors on shuffle.
+ f.get();
+
+ notifyTaskFinished(taskInfo, status, false);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
+ ", state=" + state + ", err=" + err + ']', e);
+
+ notifyTaskFinished(taskInfo,
+ new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+ }
+ }
+ });
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
+ ", state=" + state + ", err=" + err + ']', e);
+
+ notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+ }
+ }
+ }
+
+ /**
+ * Checks if message was received from parent node and prints warning if not.
+ *
+ * @param desc Sender process ID.
+ * @param msg Received message.
+ * @return {@code True} if received from parent node.
+ */
+ private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) {
+ if (!nodeDesc.processId().equals(desc.processId())) {
+ log.warning("Received process control request from unknown process (will ignore) [desc=" + desc +
+ ", msg=" + msg + ']');
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Stops execution of this process.
+ */
+ private void terminate() {
+ System.exit(1);
+ }
+
+ /**
+ * Message listener.
+ */
+ private class MessageListener implements HadoopMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) {
+ if (msg instanceof HadoopTaskExecutionRequest) {
+ if (validateNodeMessage(desc, msg))
+ runTasks((HadoopTaskExecutionRequest)msg);
+ }
+ else if (msg instanceof HadoopJobInfoUpdateRequest) {
+ if (validateNodeMessage(desc, msg))
+ updateTasks((HadoopJobInfoUpdateRequest)msg);
+ }
+ else if (msg instanceof HadoopPrepareForJobRequest) {
+ if (validateNodeMessage(desc, msg))
+ prepareProcess((HadoopPrepareForJobRequest)msg);
+ }
+ else if (msg instanceof HadoopShuffleMessage) {
+ if (log.isTraceEnabled())
+ log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
+
+ initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ try {
+ HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+
+ shuffleJob.onShuffleMessage(m);
+
+ comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
+ }
+ }
+ });
+ }
+ else if (msg instanceof HadoopShuffleAck) {
+ if (log.isTraceEnabled())
+ log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
+
+ shuffleJob.onShuffleAck((HadoopShuffleAck)msg);
+ }
+ else
+ log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+ if (log.isDebugEnabled())
+ log.debug("Lost connection with remote process: " + desc);
+
+ if (desc == null)
+ U.warn(log, "Handshake failed.");
+ else if (desc.processId().equals(nodeDesc.processId())) {
+ log.warning("Child process lost connection with parent node (will terminate child process).");
+
+ shutdown();
+
+ terminate();
+ }
+ }
+ }
+ }