You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:54:05 UTC
[73/92] [abbrv] ignite git commit: WIP.
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffle.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffle.java
deleted file mode 100644
index 8561dab..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/HadoopShuffle.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle;
-
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopComponent;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
-
-/**
- * Shuffle.
- */
-public class HadoopShuffle extends HadoopComponent {
- /** */
- private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>();
-
- /** */
- protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
- /** {@inheritDoc} */
- @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
- super.start(ctx);
-
- ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
- new IgniteBiPredicate<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- return onMessageReceived(nodeId, (HadoopMessage)msg);
- }
- });
- }
-
- /**
- * Stops shuffle.
- *
- * @param cancel If should cancel all ongoing activities.
- */
- @Override public void stop(boolean cancel) {
- for (HadoopShuffleJob job : jobs.values()) {
- try {
- job.close();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to close job.", e);
- }
- }
-
- jobs.clear();
- }
-
- /**
- * Creates new shuffle job.
- *
- * @param jobId Job ID.
- * @return Created shuffle job.
- * @throws IgniteCheckedException If job creation failed.
- */
- private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
- HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
-
- HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
- ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
-
- UUID[] rdcAddrs = new UUID[plan.reducers()];
-
- for (int i = 0; i < rdcAddrs.length; i++) {
- UUID nodeId = plan.nodeForReducer(i);
-
- assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
-
- rdcAddrs[i] = nodeId;
- }
-
- boolean init = job.initializeReduceAddresses(rdcAddrs);
-
- assert init;
-
- return job;
- }
-
- /**
- * @param nodeId Node ID to send message to.
- * @param msg Message to send.
- * @throws IgniteCheckedException If send failed.
- */
- private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
- ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
-
- ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
- }
-
- /**
- * @param jobId Task info.
- * @return Shuffle job.
- */
- private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
- HadoopShuffleJob<UUID> res = jobs.get(jobId);
-
- if (res == null) {
- res = newJob(jobId);
-
- HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
-
- if (old != null) {
- res.close();
-
- res = old;
- }
- else if (res.reducersInitialized())
- startSending(res);
- }
-
- return res;
- }
-
- /**
- * Starts message sending thread.
- *
- * @param shuffleJob Job to start sending for.
- */
- private void startSending(HadoopShuffleJob<UUID> shuffleJob) {
- shuffleJob.startSending(ctx.kernalContext().gridName(),
- new IgniteInClosure2X<UUID, HadoopShuffleMessage>() {
- @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException {
- send0(dest, msg);
- }
- }
- );
- }
-
- /**
- * Message received callback.
- *
- * @param src Sender node ID.
- * @param msg Received message.
- * @return {@code True}.
- */
- public boolean onMessageReceived(UUID src, HadoopMessage msg) {
- if (msg instanceof HadoopShuffleMessage) {
- HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
-
- try {
- job(m.jobId()).onShuffleMessage(m);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Message handling failed.", e);
- }
-
- try {
- // Reply with ack.
- send0(src, new HadoopShuffleAck(m.id(), m.jobId()));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e);
- }
- }
- else if (msg instanceof HadoopShuffleAck) {
- HadoopShuffleAck m = (HadoopShuffleAck)msg;
-
- try {
- job(m.jobId()).onShuffleAck(m);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Message handling failed.", e);
- }
- }
- else
- throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
- ", msg=" + msg + ']');
-
- return true;
- }
-
- /**
- * @param taskCtx Task info.
- * @return Output.
- */
- public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- return job(taskCtx.taskInfo().jobId()).output(taskCtx);
- }
-
- /**
- * @param taskCtx Task info.
- * @return Input.
- */
- public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- return job(taskCtx.taskInfo().jobId()).input(taskCtx);
- }
-
- /**
- * @param jobId Job id.
- */
- public void jobFinished(HadoopJobId jobId) {
- HadoopShuffleJob job = jobs.remove(jobId);
-
- if (job != null) {
- try {
- job.close();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to close job: " + jobId, e);
- }
- }
- }
-
- /**
- * Flushes all the outputs for the given job to remote nodes.
- *
- * @param jobId Job ID.
- * @return Future.
- */
- public IgniteInternalFuture<?> flush(HadoopJobId jobId) {
- HadoopShuffleJob job = jobs.get(jobId);
-
- if (job == null)
- return new GridFinishedFuture<>();
-
- try {
- return job.flush();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
- /**
- * @return Memory.
- */
- public GridUnsafeMemory memory() {
- return mem;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java
deleted file mode 100644
index c2ac017..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopEmbeddedTaskExecutor.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor;
-
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobMetadata;
-import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-
-/**
- * Task executor.
- */
-public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
- /** Job tracker. */
- private HadoopJobTracker jobTracker;
-
- /** */
- private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
-
- /** Executor service to run tasks. */
- private HadoopExecutorService exec;
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- jobTracker = ctx.jobTracker();
-
- exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(),
- ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- if (exec != null) {
- exec.shutdown(3000);
-
- if (cancel) {
- for (HadoopJobId jobId : jobs.keySet())
- cancelTasks(jobId);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) {
- if (exec != null && !exec.shutdown(30000))
- U.warn(log, "Failed to finish running tasks in 30 sec.");
- }
-
- /** {@inheritDoc} */
- @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
- ", tasksCnt=" + tasks.size() + ']');
-
- Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id());
-
- if (executedTasks == null) {
- executedTasks = new GridConcurrentHashSet<>();
-
- Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
-
- assert extractedCol == null;
- }
-
- final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks;
-
- for (final HadoopTaskInfo info : tasks) {
- assert info != null;
-
- HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
- ctx.localNodeId()) {
- @Override protected void onTaskFinished(HadoopTaskStatus status) {
- if (log.isDebugEnabled())
- log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
- "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
-
- finalExecutedTasks.remove(this);
-
- jobTracker.onTaskFinished(info, status);
- }
-
- @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- return ctx.shuffle().input(taskCtx);
- }
-
- @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- return ctx.shuffle().output(taskCtx);
- }
- };
-
- executedTasks.add(task);
-
- exec.submit(task);
- }
- }
-
- /**
- * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
- * for this job ID.
- * <p>
- * It is guaranteed that this method will not be called concurrently with
- * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
- * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
- *
- * @param jobId Job ID to cancel.
- */
- @Override public void cancelTasks(HadoopJobId jobId) {
- Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId);
-
- if (executedTasks != null) {
- for (HadoopRunnableTask task : executedTasks)
- task.cancel();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException {
- if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
- Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
-
- assert executedTasks == null || executedTasks.isEmpty();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java
deleted file mode 100644
index 8704c7b..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorService.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor;
-
-
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.internal.util.worker.GridWorkerListener;
-import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter;
-import org.apache.ignite.thread.IgniteThread;
-import org.jsr166.ConcurrentHashMap8;
-
-import static java.util.Collections.newSetFromMap;
-
-/**
- * Executor service without thread pooling.
- */
-public class HadoopExecutorService {
- /** */
- private final LinkedBlockingQueue<Callable<?>> queue;
-
- /** */
- private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>());
-
- /** */
- private final AtomicInteger active = new AtomicInteger();
-
- /** */
- private final int maxTasks;
-
- /** */
- private final String gridName;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private volatile boolean shutdown;
-
- /** */
- private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() {
- @Override public void onStopped(GridWorker w) {
- workers.remove(w);
-
- if (shutdown) {
- active.decrementAndGet();
-
- return;
- }
-
- Callable<?> task = queue.poll();
-
- if (task != null)
- startThread(task);
- else {
- active.decrementAndGet();
-
- if (!queue.isEmpty())
- startFromQueue();
- }
- }
- };
-
- /**
- * @param log Logger.
- * @param gridName Grid name.
- * @param maxTasks Max number of tasks.
- * @param maxQueue Max queue length.
- */
- public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) {
- assert maxTasks > 0 : maxTasks;
- assert maxQueue > 0 : maxQueue;
-
- this.maxTasks = maxTasks;
- this.queue = new LinkedBlockingQueue<>(maxQueue);
- this.gridName = gridName;
- this.log = log.getLogger(HadoopExecutorService.class);
- }
-
- /**
- * @return Number of active workers.
- */
- public int active() {
- return workers.size();
- }
-
- /**
- * Submit task.
- *
- * @param task Task.
- */
- public void submit(Callable<?> task) {
- while (queue.isEmpty()) {
- int active0 = active.get();
-
- if (active0 == maxTasks)
- break;
-
- if (active.compareAndSet(active0, active0 + 1)) {
- startThread(task);
-
- return; // Started in new thread bypassing queue.
- }
- }
-
- try {
- while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) {
- if (shutdown)
- return; // Rejected due to shutdown.
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- return;
- }
-
- startFromQueue();
- }
-
- /**
- * Attempts to start task from queue.
- */
- private void startFromQueue() {
- do {
- int active0 = active.get();
-
- if (active0 == maxTasks)
- break;
-
- if (active.compareAndSet(active0, active0 + 1)) {
- Callable<?> task = queue.poll();
-
- if (task == null) {
- int res = active.decrementAndGet();
-
- assert res >= 0 : res;
-
- break;
- }
-
- startThread(task);
- }
- }
- while (!queue.isEmpty());
- }
-
- /**
- * @param task Task.
- */
- private void startThread(final Callable<?> task) {
- String workerName;
-
- if (task instanceof HadoopRunnableTask) {
- final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo();
-
- workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt();
- }
- else
- workerName = task.toString();
-
- GridWorker w = new GridWorker(gridName, workerName, log, lsnr) {
- @Override protected void body() {
- try {
- task.call();
- }
- catch (Exception e) {
- log.error("Failed to execute task: " + task, e);
- }
- }
- };
-
- workers.add(w);
-
- if (shutdown)
- w.cancel();
-
- new IgniteThread(w).start();
- }
-
- /**
- * Shuts down this executor service.
- *
- * @param awaitTimeMillis Time in milliseconds to wait for tasks completion.
- * @return {@code true} If all tasks completed.
- */
- public boolean shutdown(long awaitTimeMillis) {
- shutdown = true;
-
- for (GridWorker w : workers)
- w.cancel();
-
- while (awaitTimeMillis > 0 && !workers.isEmpty()) {
- try {
- Thread.sleep(100);
-
- awaitTimeMillis -= 100;
- }
- catch (InterruptedException e) {
- break;
- }
- }
-
- return workers.isEmpty();
- }
-
- /**
- * @return {@code true} If method {@linkplain #shutdown(long)} was already called.
- */
- public boolean isShutdown() {
- return shutdown;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java
deleted file mode 100644
index 8415d6f..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopRunnableTask.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopHashMultimap;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopMultimap;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopSkipList;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.COMBINER_HASHMAP_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMBINE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
-
-/**
- * Runnable task.
- */
-public abstract class HadoopRunnableTask implements Callable<Void> {
- /** */
- private final GridUnsafeMemory mem;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final HadoopJob job;
-
- /** Task to run. */
- private final HadoopTaskInfo info;
-
- /** Submit time. */
- private final long submitTs = U.currentTimeMillis();
-
- /** Execution start timestamp. */
- private long execStartTs;
-
- /** Execution end timestamp. */
- private long execEndTs;
-
- /** */
- private HadoopMultimap combinerInput;
-
- /** */
- private volatile HadoopTaskContext ctx;
-
- /** Set if task is to cancelling. */
- private volatile boolean cancelled;
-
- /** Node id. */
- private UUID nodeId;
-
- /**
- * @param log Log.
- * @param job Job.
- * @param mem Memory.
- * @param info Task info.
- * @param nodeId Node id.
- */
- protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info,
- UUID nodeId) {
- this.nodeId = nodeId;
- this.log = log.getLogger(HadoopRunnableTask.class);
- this.job = job;
- this.mem = mem;
- this.info = info;
- }
-
- /**
- * @return Wait time.
- */
- public long waitTime() {
- return execStartTs - submitTs;
- }
-
- /**
- * @return Execution time.
- */
- public long executionTime() {
- return execEndTs - execStartTs;
- }
-
- /** {@inheritDoc} */
- @Override public Void call() throws IgniteCheckedException {
- ctx = job.getTaskContext(info);
-
- return ctx.runAsJobOwner(new Callable<Void>() {
- @Override public Void call() throws Exception {
- call0();
-
- return null;
- }
- });
- }
-
- /**
- * Implements actual task running.
- * @throws IgniteCheckedException
- */
- void call0() throws IgniteCheckedException {
- execStartTs = U.currentTimeMillis();
-
- Throwable err = null;
-
- HadoopTaskState state = HadoopTaskState.COMPLETED;
-
- HadoopPerformanceCounter perfCntr = null;
-
- try {
- perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
-
- perfCntr.onTaskSubmit(info, submitTs);
- perfCntr.onTaskPrepare(info, execStartTs);
-
- ctx.prepareTaskEnvironment();
-
- runTask(perfCntr);
-
- if (info.type() == MAP && job.info().hasCombiner()) {
- ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
-
- try {
- runTask(perfCntr);
- }
- finally {
- ctx.taskInfo(info);
- }
- }
- }
- catch (HadoopTaskCancelledException ignored) {
- state = HadoopTaskState.CANCELED;
- }
- catch (Throwable e) {
- state = HadoopTaskState.FAILED;
- err = e;
-
- U.error(log, "Task execution failed.", e);
-
- if (e instanceof Error)
- throw e;
- }
- finally {
- execEndTs = U.currentTimeMillis();
-
- if (perfCntr != null)
- perfCntr.onTaskFinish(info, execEndTs);
-
- onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters()));
-
- if (combinerInput != null)
- combinerInput.close();
-
- if (ctx != null)
- ctx.cleanupTaskEnvironment();
- }
- }
-
- /**
- * @param perfCntr Performance counter.
- * @throws IgniteCheckedException If failed.
- */
- private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
- if (cancelled)
- throw new HadoopTaskCancelledException("Task cancelled.");
-
- try (HadoopTaskOutput out = createOutputInternal(ctx);
- HadoopTaskInput in = createInputInternal(ctx)) {
-
- ctx.input(in);
- ctx.output(out);
-
- perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
-
- ctx.run();
- }
- }
-
- /**
- * Cancel the executed task.
- */
- public void cancel() {
- cancelled = true;
-
- if (ctx != null)
- ctx.cancel();
- }
-
- /**
- * @param status Task status.
- */
- protected abstract void onTaskFinished(HadoopTaskStatus status);
-
- /**
- * @param ctx Task context.
- * @return Task input.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("unchecked")
- private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
- switch (ctx.taskInfo().type()) {
- case SETUP:
- case MAP:
- case COMMIT:
- case ABORT:
- return null;
-
- case COMBINE:
- assert combinerInput != null;
-
- return combinerInput.input(ctx);
-
- default:
- return createInput(ctx);
- }
- }
-
- /**
- * @param ctx Task context.
- * @return Input.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException;
-
- /**
- * @param ctx Task info.
- * @return Output.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException;
-
- /**
- * @param ctx Task info.
- * @return Task output.
- * @throws IgniteCheckedException If failed.
- */
- private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
- switch (ctx.taskInfo().type()) {
- case SETUP:
- case REDUCE:
- case COMMIT:
- case ABORT:
- return null;
-
- case MAP:
- if (job.info().hasCombiner()) {
- assert combinerInput == null;
-
- combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ?
- new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
- new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree
-
- return combinerInput.startAdding(ctx);
- }
-
- default:
- return createOutput(ctx);
- }
- }
-
- /**
- * @return Task info.
- */
- public HadoopTaskInfo taskInfo() {
- return info;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java
deleted file mode 100644
index 7819367..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskExecutorAdapter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor;
-
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopComponent;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobMetadata;
-
-/**
- * Common superclass for task executor.
- */
-public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
- /**
- * Runs tasks.
- *
- * @param job Job.
- * @param tasks Tasks.
- * @throws IgniteCheckedException If failed.
- */
- public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException;
-
- /**
- * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
- * for this job ID.
- * <p>
- * It is guaranteed that this method will not be called concurrently with
- * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
- * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
- *
- * @param jobId Job ID to cancel.
- */
- public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException;
-
- /**
- * On job state change callback;
- *
- * @param meta Job metadata.
- */
- public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java
deleted file mode 100644
index 62ba932..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor;
-
-/**
-* State of the task.
-*/
-public enum HadoopTaskState {
- /** Running task. */
- RUNNING,
-
- /** Completed task. */
- COMPLETED,
-
- /** Failed task. */
- FAILED,
-
- /** Canceled task. */
- CANCELED,
-
- /** Process crashed. */
- CRASHED
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java
deleted file mode 100644
index c45616e..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopTaskStatus.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Task status.
- */
-public class HadoopTaskStatus implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private HadoopTaskState state;
-
- /** */
- private Throwable failCause;
-
- /** */
- private HadoopCounters cntrs;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public HadoopTaskStatus() {
- // No-op.
- }
-
- /**
- * Creates new instance.
- *
- * @param state Task state.
- * @param failCause Failure cause (if any).
- */
- public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) {
- this(state, failCause, null);
- }
-
- /**
- * Creates new instance.
- *
- * @param state Task state.
- * @param failCause Failure cause (if any).
- * @param cntrs Task counters.
- */
- public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause,
- @Nullable HadoopCounters cntrs) {
- assert state != null;
-
- this.state = state;
- this.failCause = failCause;
- this.cntrs = cntrs;
- }
-
- /**
- * @return State.
- */
- public HadoopTaskState state() {
- return state;
- }
-
- /**
- * @return Fail cause.
- */
- @Nullable public Throwable failCause() {
- return failCause;
- }
-
- /**
- * @return Counters.
- */
- @Nullable public HadoopCounters counters() {
- return cntrs;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopTaskStatus.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(state);
- out.writeObject(failCause);
- out.writeObject(cntrs);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- state = (HadoopTaskState)in.readObject();
- failCause = (Throwable)in.readObject();
- cntrs = (HadoopCounters)in.readObject();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutor.java
deleted file mode 100644
index a330650..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ /dev/null
@@ -1,976 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobMetadata;
-import org.apache.ignite.internal.processors.hadoop.impl.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskExecutorAdapter;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskStatus;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.child.HadoopExternalProcessStarter;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopExternalCommunication;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopMessageListener;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.spi.IgnitePortProtocol;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
-
-import static org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState.CRASHED;
-import static org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState.FAILED;
-
-/**
- * External process registry. Handles external process lifecycle.
- */
-public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
- /** Hadoop context. */
- private HadoopContext ctx;
-
- /** */
- private String javaCmd;
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Node process descriptor. */
- private HadoopProcessDescriptor nodeDesc;
-
- /** Output base. */
- private File outputBase;
-
- /** Path separator. */
- private String pathSep;
-
- /** Hadoop external communication. */
- private HadoopExternalCommunication comm;
-
- /** Starting processes. */
- private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
-
- /** Starting processes. */
- private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
-
- /** Busy lock. */
- private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
-
- /** Job tracker. */
- private HadoopJobTracker jobTracker;
-
- /** {@inheritDoc} */
- @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
- this.ctx = ctx;
-
- log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class);
-
- outputBase = U.resolveWorkDirectory("hadoop", false);
-
- pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
-
- initJavaCommand();
-
- comm = new HadoopExternalCommunication(
- ctx.localNodeId(),
- UUID.randomUUID(),
- ctx.kernalContext().config().getMarshaller(),
- log,
- ctx.kernalContext().getSystemExecutorService(),
- ctx.kernalContext().gridName());
-
- comm.setListener(new MessageListener());
-
- comm.start();
-
- nodeDesc = comm.localProcessDescriptor();
-
- ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
- HadoopExternalTaskExecutor.class);
-
- if (nodeDesc.sharedMemoryPort() != -1)
- ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
- HadoopExternalTaskExecutor.class);
-
- jobTracker = ctx.jobTracker();
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) {
- busyLock.writeLock();
-
- try {
- comm.stop();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onJobStateChanged(final HadoopJobMetadata meta) {
- final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
-
- // If we have a local process for this job.
- if (proc != null) {
- if (log.isDebugEnabled())
- log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
-
- if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
- if (log.isDebugEnabled())
- log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
- ", proc=" + proc + ']');
-
- runningProcsByJobId.remove(meta.jobId());
- runningProcsByProcId.remove(proc.descriptor().processId());
-
- proc.terminate();
-
- return;
- }
-
- if (proc.initFut.isDone()) {
- if (!proc.initFut.isFailed())
- sendJobInfoUpdate(proc, meta);
- else if (log.isDebugEnabled())
- log.debug("Failed to initialize child process (will skip job state notification) " +
- "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
- }
- else {
- proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
- @Override
- public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
- try {
- f.get();
-
- sendJobInfoUpdate(proc, meta);
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to initialize child process (will skip job state notification) " +
- "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
- }
-
- }
- });
- }
- }
- else if (ctx.isParticipating(meta)) {
- HadoopJob job;
-
- try {
- job = jobTracker.job(meta.jobId(), meta.jobInfo());
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to get job: " + meta.jobId(), e);
-
- return;
- }
-
- startProcess(job, meta.mapReducePlan());
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
- if (!busyLock.tryReadLock()) {
- if (log.isDebugEnabled())
- log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
-
- return;
- }
-
- try {
- HadoopProcess proc = runningProcsByJobId.get(job.id());
-
- HadoopTaskType taskType = F.first(tasks).type();
-
- if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT ||
- taskType == HadoopTaskType.COMMIT) {
- if (proc == null || proc.terminated()) {
- runningProcsByJobId.remove(job.id(), proc);
-
- // Start new process for ABORT task since previous processes were killed.
- proc = startProcess(job, jobTracker.plan(job.id()));
-
- if (log.isDebugEnabled())
- log.debug("Starting new process for maintenance task [jobId=" + job.id() +
- ", proc=" + proc + ", taskType=" + taskType + ']');
- }
- }
- else
- assert proc != null : "Missing started process for task execution request: " + job.id() +
- ", tasks=" + tasks;
-
- final HadoopProcess proc0 = proc;
-
- proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
- @Override public void apply(
- IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
- if (!busyLock.tryReadLock())
- return;
-
- try {
- f.get();
-
- proc0.addTasks(tasks);
-
- if (log.isDebugEnabled())
- log.debug("Sending task execution request to child process [jobId=" + job.id() +
- ", proc=" + proc0 + ", tasks=" + tasks + ']');
-
- sendExecutionRequest(proc0, job, tasks);
- }
- catch (IgniteCheckedException e) {
- notifyTasksFailed(tasks, FAILED, e);
- }
- finally {
- busyLock.readUnlock();
- }
- }
- });
- }
- finally {
- busyLock.readUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancelTasks(HadoopJobId jobId) {
- HadoopProcess proc = runningProcsByJobId.get(jobId);
-
- if (proc != null)
- proc.terminate();
- }
-
- /**
- * Sends execution request to remote node.
- *
- * @param proc Process to send request to.
- * @param job Job instance.
- * @param tasks Collection of tasks to execute in started process.
- */
- private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks)
- throws IgniteCheckedException {
- // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
- proc.lock();
-
- try {
- if (proc.terminated()) {
- notifyTasksFailed(tasks, CRASHED, null);
-
- return;
- }
-
- HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest();
-
- req.jobId(job.id());
- req.jobInfo(job.info());
- req.tasks(tasks);
-
- comm.sendMessage(proc.descriptor(), req);
- }
- finally {
- proc.unlock();
- }
- }
-
- /**
- * @return External task metadata.
- */
- private HadoopExternalTaskMetadata buildTaskMeta() {
- HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata();
-
- meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
- meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
- "-DIGNITE_HOME=" + U.getIgniteHome()));
-
- return meta;
- }
-
- /**
- * @param tasks Tasks to notify about.
- * @param state Fail state.
- * @param e Optional error.
- */
- private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) {
- HadoopTaskStatus fail = new HadoopTaskStatus(state, e);
-
- for (HadoopTaskInfo task : tasks)
- jobTracker.onTaskFinished(task, fail);
- }
-
- /**
- * Starts process template that will be ready to execute Hadoop tasks.
- *
- * @param job Job instance.
- * @param plan Map reduce plan.
- */
- private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) {
- final UUID childProcId = UUID.randomUUID();
-
- HadoopJobId jobId = job.id();
-
- final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId);
-
- final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
-
- HadoopProcess old = runningProcsByJobId.put(jobId, proc);
-
- assert old == null;
-
- old = runningProcsByProcId.put(childProcId, proc);
-
- assert old == null;
-
- ctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- if (!busyLock.tryReadLock()) {
- fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
-
- return;
- }
-
- try {
- HadoopExternalTaskMetadata startMeta = buildTaskMeta();
-
- if (log.isDebugEnabled())
- log.debug("Created hadoop child process metadata for job [job=" + job +
- ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
-
- Process proc = startJavaProcess(childProcId, startMeta, job);
-
- BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
- String line;
-
- // Read up all the process output.
- while ((line = rdr.readLine()) != null) {
- if (log.isDebugEnabled())
- log.debug("Tracing process output: " + line);
-
- if ("Started".equals(line)) {
- // Process started successfully, it should not write anything more to the output stream.
- if (log.isDebugEnabled())
- log.debug("Successfully started child process [childProcId=" + childProcId +
- ", meta=" + job + ']');
-
- fut.onProcessStarted(proc);
-
- break;
- }
- else if ("Failed".equals(line)) {
- StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
-
- while ((line = rdr.readLine()) != null)
- sb.append(" ").append(line).append("\n");
-
- // Cut last character.
- sb.setLength(sb.length() - 1);
-
- log.warning(sb.toString());
-
- fut.onDone(new IgniteCheckedException(sb.toString()));
-
- break;
- }
- }
- }
- catch (Throwable e) {
- fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
-
- if (e instanceof Error)
- throw (Error)e;
- }
- finally {
- busyLock.readUnlock();
- }
- }
- }, true);
-
- fut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
- @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
- try {
- // Make sure there were no exceptions.
- f.get();
-
- prepareForJob(proc, job, plan);
- }
- catch (IgniteCheckedException ignore) {
- // Exception is printed in future's onDone() method.
- }
- }
- });
-
- return proc;
- }
-
- /**
- * Checks that java local command is available.
- *
- * @throws IgniteCheckedException If initialization failed.
- */
- private void initJavaCommand() throws IgniteCheckedException {
- String javaHome = System.getProperty("java.home");
-
- if (javaHome == null)
- javaHome = System.getenv("JAVA_HOME");
-
- if (javaHome == null)
- throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
-
- javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
-
- try {
- Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
-
- Collection<String> out = readProcessOutput(proc);
-
- int res = proc.waitFor();
-
- if (res != 0)
- throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
- "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
-
- if (log.isInfoEnabled()) {
- log.info("Will use java for external task execution: ");
-
- for (String s : out)
- log.info(" " + s);
- }
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to check java for external task execution.", e);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
- }
- }
-
- /**
- * Reads process output line-by-line.
- *
- * @param proc Process to read output.
- * @return Read lines.
- * @throws IOException If read failed.
- */
- private Collection<String> readProcessOutput(Process proc) throws IOException {
- BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
- Collection<String> res = new ArrayList<>();
-
- String s;
-
- while ((s = rdr.readLine()) != null)
- res.add(s);
-
- return res;
- }
-
- /**
- * Builds process from metadata.
- *
- * @param childProcId Child process ID.
- * @param startMeta Metadata.
- * @param job Job.
- * @return Started process.
- */
- private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta,
- HadoopJob job) throws Exception {
- String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
-
- if (log.isDebugEnabled())
- log.debug("Will write process log output to: " + outFldr);
-
- List<String> cmd = new ArrayList<>();
-
- File workDir = U.resolveWorkDirectory("", false);
-
- cmd.add(javaCmd);
- cmd.addAll(startMeta.jvmOptions());
- cmd.add("-cp");
- cmd.add(buildClasspath(startMeta.classpath()));
- cmd.add(HadoopExternalProcessStarter.class.getName());
- cmd.add("-cpid");
- cmd.add(String.valueOf(childProcId));
- cmd.add("-ppid");
- cmd.add(String.valueOf(nodeDesc.processId()));
- cmd.add("-nid");
- cmd.add(String.valueOf(nodeDesc.parentNodeId()));
- cmd.add("-addr");
- cmd.add(nodeDesc.address());
- cmd.add("-tport");
- cmd.add(String.valueOf(nodeDesc.tcpPort()));
- cmd.add("-sport");
- cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
- cmd.add("-out");
- cmd.add(outFldr);
- cmd.add("-wd");
- cmd.add(workDir.getAbsolutePath());
-
- return new ProcessBuilder(cmd)
- .redirectErrorStream(true)
- .directory(workDir)
- .start();
- }
-
- /**
- * Gets job work folder.
- *
- * @param jobId Job ID.
- * @return Job work folder.
- */
- private String jobWorkFolder(HadoopJobId jobId) {
- return outputBase + File.separator + "Job_" + jobId;
- }
-
- /**
- * @param cp Classpath collection.
- * @return Classpath string.
- */
- private String buildClasspath(Collection<String> cp) {
- assert !cp.isEmpty();
-
- StringBuilder sb = new StringBuilder();
-
- for (String s : cp)
- sb.append(s).append(pathSep);
-
- sb.setLength(sb.length() - 1);
-
- return sb.toString();
- }
-
- /**
- * Sends job info update request to remote process.
- *
- * @param proc Process to send request to.
- * @param meta Job metadata.
- */
- private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) {
- Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
-
- int rdcNum = meta.mapReducePlan().reducers();
-
- HadoopProcessDescriptor[] addrs = null;
-
- if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
- addrs = new HadoopProcessDescriptor[rdcNum];
-
- for (int i = 0; i < rdcNum; i++) {
- HadoopProcessDescriptor desc = rdcAddrs.get(i);
-
- assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
-
- addrs[i] = desc;
- }
- }
-
- try {
- comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
- }
- catch (IgniteCheckedException e) {
- if (!proc.terminated()) {
- log.error("Failed to send job state update message to remote child process (will kill the process) " +
- "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
-
- proc.terminate();
- }
- }
- }
-
- /**
- * Sends prepare request to remote process.
- *
- * @param proc Process to send request to.
- * @param job Job.
- * @param plan Map reduce plan.
- */
- private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) {
- try {
- comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(),
- plan.reducers(), plan.reducers(ctx.localNodeId())));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
- ", plan=" + plan + ']', e);
-
- proc.terminate();
- }
- }
-
- /**
- * Processes task finished message.
- *
- * @param desc Remote process descriptor.
- * @param taskMsg Task finished message.
- */
- private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) {
- HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
- if (proc != null)
- proc.removeTask(taskMsg.taskInfo());
-
- jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
- }
-
- /**
- *
- */
- private class MessageListener implements HadoopMessageListener {
- /** {@inheritDoc} */
- @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) {
- if (!busyLock.tryReadLock())
- return;
-
- try {
- if (msg instanceof HadoopProcessStartedAck) {
- HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
- assert proc != null : "Missing child process for processId: " + desc;
-
- HadoopProcessFuture fut = proc.initFut;
-
- if (fut != null)
- fut.onReplyReceived(desc);
- // Safety.
- else
- log.warning("Failed to find process start future (will ignore): " + desc);
- }
- else if (msg instanceof HadoopTaskFinishedMessage) {
- HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg;
-
- processTaskFinishedMessage(desc, taskMsg);
- }
- else
- log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
- }
- finally {
- busyLock.readUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
- if (!busyLock.tryReadLock())
- return;
-
- try {
- if (desc == null) {
- U.warn(log, "Handshake failed.");
-
- return;
- }
-
- // Notify job tracker about failed tasks.
- HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
- if (proc != null) {
- Collection<HadoopTaskInfo> tasks = proc.tasks();
-
- if (!F.isEmpty(tasks)) {
- log.warning("Lost connection with alive process (will terminate): " + desc);
-
- HadoopTaskStatus status = new HadoopTaskStatus(CRASHED,
- new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
-
- for (HadoopTaskInfo info : tasks)
- jobTracker.onTaskFinished(info, status);
-
- runningProcsByJobId.remove(proc.jobId(), proc);
- }
-
- // Safety.
- proc.terminate();
- }
- }
- finally {
- busyLock.readUnlock();
- }
- }
- }
-
- /**
- * Hadoop process.
- */
- private static class HadoopProcess extends ReentrantLock {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- private final HadoopJobId jobId;
-
- /** Process. */
- private Process proc;
-
- /** Init future. Completes when process is ready to receive messages. */
- private final HadoopProcessFuture initFut;
-
- /** Process descriptor. */
- private HadoopProcessDescriptor procDesc;
-
- /** Reducers planned for this process. */
- private Collection<Integer> reducers;
-
- /** Tasks. */
- private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
-
- /** Terminated flag. */
- private volatile boolean terminated;
-
- /**
- * @param jobId Job ID.
- * @param initFut Init future.
- */
- private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut,
- int[] reducers) {
- this.jobId = jobId;
- this.initFut = initFut;
-
- if (!F.isEmpty(reducers)) {
- this.reducers = new ArrayList<>(reducers.length);
-
- for (int r : reducers)
- this.reducers.add(r);
- }
- }
-
- /**
- * @return Communication process descriptor.
- */
- private HadoopProcessDescriptor descriptor() {
- return procDesc;
- }
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * Initialized callback.
- *
- * @param proc Java process representation.
- * @param procDesc Process descriptor.
- */
- private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) {
- this.proc = proc;
- this.procDesc = procDesc;
- }
-
- /**
- * Terminates process (kills it).
- */
- private void terminate() {
- // Guard against concurrent message sending.
- lock();
-
- try {
- terminated = true;
-
- if (!initFut.isDone())
- initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
- @Override public void apply(
- IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
- proc.destroy();
- }
- });
- else
- proc.destroy();
- }
- finally {
- unlock();
- }
- }
-
- /**
- * @return Terminated flag.
- */
- private boolean terminated() {
- return terminated;
- }
-
- /**
- * Sets process tasks.
- *
- * @param tasks Tasks to set.
- */
- private void addTasks(Collection<HadoopTaskInfo> tasks) {
- this.tasks.addAll(tasks);
- }
-
- /**
- * Removes task when it was completed.
- *
- * @param task Task to remove.
- */
- private void removeTask(HadoopTaskInfo task) {
- if (tasks != null)
- tasks.remove(task);
- }
-
- /**
- * @return Collection of tasks.
- */
- private Collection<HadoopTaskInfo> tasks() {
- return tasks;
- }
-
- /**
- * @return Planned reducers.
- */
- private Collection<Integer> reducers() {
- return reducers;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopProcess.class, this);
- }
- }
-
- /**
- *
- */
- private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Child process ID. */
- private UUID childProcId;
-
- /** Job ID. */
- private HadoopJobId jobId;
-
- /** Process descriptor. */
- private HadoopProcessDescriptor desc;
-
- /** Running process. */
- private Process proc;
-
- /** Process started flag. */
- private volatile boolean procStarted;
-
- /** Reply received flag. */
- private volatile boolean replyReceived;
-
- /** Logger. */
- private final IgniteLogger log = HadoopExternalTaskExecutor.this.log;
-
- /**
- */
- private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId) {
- this.childProcId = childProcId;
- this.jobId = jobId;
- }
-
- /**
- * Process started callback.
- */
- public void onProcessStarted(Process proc) {
- this.proc = proc;
-
- procStarted = true;
-
- if (procStarted && replyReceived)
- onDone(F.t(proc, desc));
- }
-
- /**
- * Reply received callback.
- */
- public void onReplyReceived(HadoopProcessDescriptor desc) {
- assert childProcId.equals(desc.processId());
-
- this.desc = desc;
-
- replyReceived = true;
-
- if (procStarted && replyReceived)
- onDone(F.t(proc, desc));
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res,
- @Nullable Throwable err) {
- if (err == null) {
- HadoopProcess proc = runningProcsByProcId.get(childProcId);
-
- assert proc != null;
-
- assert proc.initFut == this;
-
- proc.onInitialized(res.get1(), res.get2());
-
- if (!F.isEmpty(proc.reducers()))
- jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
- }
- else {
- // Clean up since init failed.
- runningProcsByJobId.remove(jobId);
- runningProcsByProcId.remove(childProcId);
- }
-
- if (super.onDone(res, err)) {
- if (err == null) {
- if (log.isDebugEnabled())
- log.debug("Initialized child process for external task execution [jobId=" + jobId +
- ", desc=" + desc + ", initTime=" + duration() + ']');
- }
- else
- U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
- ", desc=" + desc + ']', err);
-
- return true;
- }
-
- return false;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskMetadata.java
deleted file mode 100644
index 6e42a7f..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskMetadata.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.util.Collection;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * External task metadata (classpath, JVM options) needed to start external process execution.
- */
-public class HadoopExternalTaskMetadata {
- /** Process classpath. */
- private Collection<String> classpath;
-
- /** JVM options. */
- @GridToStringInclude
- private Collection<String> jvmOpts;
-
- /**
- * @return JVM Options.
- */
- public Collection<String> jvmOptions() {
- return jvmOpts;
- }
-
- /**
- * @param jvmOpts JVM options.
- */
- public void jvmOptions(Collection<String> jvmOpts) {
- this.jvmOpts = jvmOpts;
- }
-
- /**
- * @return Classpath.
- */
- public Collection<String> classpath() {
- return classpath;
- }
-
- /**
- * @param classpath Classpath.
- */
- public void classpath(Collection<String> classpath) {
- this.classpath = classpath;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopExternalTaskMetadata.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopJobInfoUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopJobInfoUpdateRequest.java
deleted file mode 100644
index 8f457a0..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopJobInfoUpdateRequest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Job info update request.
- */
-public class HadoopJobInfoUpdateRequest implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- @GridToStringInclude
- private HadoopJobId jobId;
-
- /** Job phase. */
- @GridToStringInclude
- private HadoopJobPhase jobPhase;
-
- /** Reducers addresses. */
- @GridToStringInclude
- private HadoopProcessDescriptor[] reducersAddrs;
-
- /**
- * Constructor required by {@link Externalizable}.
- */
- public HadoopJobInfoUpdateRequest() {
- // No-op.
- }
-
- /**
- * @param jobId Job ID.
- * @param jobPhase Job phase.
- * @param reducersAddrs Reducers addresses.
- */
- public HadoopJobInfoUpdateRequest(HadoopJobId jobId, HadoopJobPhase jobPhase,
- HadoopProcessDescriptor[] reducersAddrs) {
- assert jobId != null;
-
- this.jobId = jobId;
- this.jobPhase = jobPhase;
- this.reducersAddrs = reducersAddrs;
- }
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @return Job phase.
- */
- public HadoopJobPhase jobPhase() {
- return jobPhase;
- }
-
- /**
- * @return Reducers addresses.
- */
- public HadoopProcessDescriptor[] reducersAddresses() {
- return reducersAddrs;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
-
- out.writeObject(jobPhase);
- U.writeArray(out, reducersAddrs);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new HadoopJobId();
- jobId.readExternal(in);
-
- jobPhase = (HadoopJobPhase)in.readObject();
- reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopJobInfoUpdateRequest.class, this);
- }
-}
\ No newline at end of file