You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:19 UTC
[03/31] incubator-ignite git commit: # IGNITE-386: WIP on internal
namings.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
new file mode 100644
index 0000000..91a2d6f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -0,0 +1,1625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.jobtracker;
+
+import org.apache.ignite.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.event.*;
+import javax.cache.expiry.*;
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
+import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
+
+/**
+ * Hadoop job tracker.
+ */
+public class HadoopJobTracker extends HadoopComponent {
+ /** */
+ private final GridMutex mux = new GridMutex();
+
+ /** */
+ private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj;
+
+ /** Projection with expiry policy for finished job updates. */
+ private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj;
+
+ /** Map-reduce execution planner. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private GridHadoopMapReducePlanner mrPlanner;
+
+ /** All the known jobs. */
+ private final ConcurrentMap<GridHadoopJobId, GridFutureAdapterEx<GridHadoopJob>> jobs = new ConcurrentHashMap8<>();
+
+ /** Locally active jobs. */
+ private final ConcurrentMap<GridHadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>();
+
+ /** Locally requested finish futures. */
+ private final ConcurrentMap<GridHadoopJobId, GridFutureAdapter<GridHadoopJobId>> activeFinishFuts =
+ new ConcurrentHashMap8<>();
+
+ /** Event processing service. */
+ private ExecutorService evtProcSvc;
+
+ /** Component busy lock. */
+ private GridSpinReadWriteLock busyLock;
+
+ /** Closure to check result of async transform of system cache. */
+ private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> gridFut) {
+ try {
+ gridFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to transform system cache.", e);
+ }
+ }
+ };
+
+ /** {@inheritDoc} */
+ @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+ super.start(ctx);
+
+ busyLock = new GridSpinReadWriteLock();
+
+ evtProcSvc = Executors.newFixedThreadPool(1);
+ }
+
+ /**
+ * @return Job meta projection.
+ */
+ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+ private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() {
+ GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj;
+
+ if (prj == null) {
+ synchronized (mux) {
+ if ((prj = jobMetaPrj) == null) {
+ CacheProjection<Object, Object> sysCache = ctx.kernalContext().cache()
+ .cache(CU.SYS_CACHE_HADOOP_MR);
+
+ assert sysCache != null;
+
+ mrPlanner = ctx.planner();
+
+ try {
+ ctx.kernalContext().resource().injectGeneric(mrPlanner);
+ }
+ catch (IgniteCheckedException e) { // Must not happen.
+ U.error(log, "Failed to inject resources.", e);
+
+ throw new IllegalStateException(e);
+ }
+
+ jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)
+ sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class);
+
+ if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
+ ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
+ new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl()));
+
+ finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc);
+ }
+ else
+ finishedJobMetaPrj = jobMetaPrj;
+ }
+ }
+ }
+
+ return prj;
+ }
+
+ /**
+ * @return Projection with expiry policy for finished job updates.
+ */
+ private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() {
+ GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj;
+
+ if (prj == null) {
+ jobMetaCache();
+
+ prj = finishedJobMetaPrj;
+
+ assert prj != null;
+ }
+
+ return prj;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ jobMetaCache().context().continuousQueries().executeInternalQuery(
+ new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>() {
+ @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId,
+ ? extends GridHadoopJobMetadata>> evts) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ // Must process query callback in a separate thread to avoid deadlocks.
+ evtProcSvc.submit(new EventHandler() {
+ @Override protected void body() throws IgniteCheckedException {
+ processJobMetadataUpdates(evts);
+ }
+ });
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ },
+ null,
+ true,
+ true
+ );
+
+ ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
+ @Override public void onEvent(final Event evt) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ // Must process discovery callback in a separate thread to avoid deadlock.
+ evtProcSvc.submit(new EventHandler() {
+ @Override protected void body() {
+ processNodeLeft((DiscoveryEvent)evt);
+ }
+ });
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+ }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ busyLock.writeLock();
+
+ evtProcSvc.shutdown();
+
+ // Fail all pending futures.
+ for (GridFutureAdapter<GridHadoopJobId> fut : activeFinishFuts.values())
+ fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping)."));
+ }
+
+ /**
+ * Submits execution of Hadoop job to grid.
+ *
+ * @param jobId Job ID.
+ * @param info Job info.
+ * @return Job completion future.
+ */
+ @SuppressWarnings("unchecked")
+ public IgniteInternalFuture<GridHadoopJobId> submit(GridHadoopJobId jobId, GridHadoopJobInfo info) {
+ if (!busyLock.tryReadLock()) {
+ return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to execute map-reduce job " +
+ "(grid is stopping): " + info));
+ }
+
+ try {
+ long jobPrepare = U.currentTimeMillis();
+
+ if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId))
+ throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
+
+ GridHadoopJob job = job(jobId, info);
+
+ GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
+
+ GridHadoopJobMetadata meta = new GridHadoopJobMetadata(ctx.localNodeId(), jobId, info);
+
+ meta.mapReducePlan(mrPlan);
+
+ meta.pendingSplits(allSplits(mrPlan));
+ meta.pendingReducers(allReducers(mrPlan));
+
+ GridFutureAdapter<GridHadoopJobId> completeFut = new GridFutureAdapter<>();
+
+ GridFutureAdapter<GridHadoopJobId> old = activeFinishFuts.put(jobId, completeFut);
+
+ assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']';
+
+ if (log.isDebugEnabled())
+ log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']');
+
+ long jobStart = U.currentTimeMillis();
+
+ GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(meta.counters(),
+ ctx.localNodeId());
+
+ perfCntr.clientSubmissionEvents(info);
+ perfCntr.onJobPrepare(jobPrepare);
+ perfCntr.onJobStart(jobStart);
+
+ if (jobMetaCache().putIfAbsent(jobId, meta) != null)
+ throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
+
+ return completeFut;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to submit job: " + jobId, e);
+
+ return new GridFinishedFutureEx<>(e);
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * Convert Hadoop job metadata to job status.
+ *
+ * @param meta Metadata.
+ * @return Status.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ public static GridHadoopJobStatus status(GridHadoopJobMetadata meta) {
+ GridHadoopJobInfo jobInfo = meta.jobInfo();
+
+ return new GridHadoopJobStatus(
+ meta.jobId(),
+ jobInfo.jobName(),
+ jobInfo.user(),
+ meta.pendingSplits() != null ? meta.pendingSplits().size() : 0,
+ meta.pendingReducers() != null ? meta.pendingReducers().size() : 0,
+ meta.mapReducePlan().mappers(),
+ meta.mapReducePlan().reducers(),
+ meta.phase(),
+ meta.failCause() != null,
+ meta.version()
+ );
+ }
+
+ /**
+ * Gets hadoop job status for given job ID.
+ *
+ * @param jobId Job ID to get status for.
+ * @return Job status for given job ID or {@code null} if job was not found.
+ */
+ @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
+ if (!busyLock.tryReadLock())
+ return null; // Grid is stopping.
+
+ try {
+ GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+ return meta != null ? status(meta) : null;
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * Gets job finish future.
+ *
+ * @param jobId Job ID.
+ * @return Finish future or {@code null}.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
+ if (!busyLock.tryReadLock())
+ return null; // Grid is stopping.
+
+ try {
+ GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+ if (meta == null)
+ return null;
+
+ if (log.isTraceEnabled())
+ log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
+
+ if (meta.phase() == PHASE_COMPLETE) {
+ if (log.isTraceEnabled())
+ log.trace("Job is complete, returning finished future: " + jobId);
+
+ return new GridFinishedFutureEx<>(jobId, meta.failCause());
+ }
+
+ GridFutureAdapter<GridHadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId,
+ new GridFutureAdapter<GridHadoopJobId>());
+
+ // Get meta from cache one more time to close the window.
+ meta = jobMetaCache().get(jobId);
+
+ if (log.isTraceEnabled())
+ log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
+
+ if (meta == null) {
+ fut.onDone();
+
+ activeFinishFuts.remove(jobId , fut);
+ }
+ else if (meta.phase() == PHASE_COMPLETE) {
+ fut.onDone(jobId, meta.failCause());
+
+ activeFinishFuts.remove(jobId , fut);
+ }
+
+ return fut;
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * Gets job plan by job ID.
+ *
+ * @param jobId Job ID.
+ * @return Job plan.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridHadoopMapReducePlan plan(GridHadoopJobId jobId) throws IgniteCheckedException {
+ if (!busyLock.tryReadLock())
+ return null;
+
+ try {
+ GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+ if (meta != null)
+ return meta.mapReducePlan();
+
+ return null;
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * Callback from task executor invoked when a task has been finished.
+ *
+ * @param info Task info.
+ * @param status Task status.
+ */
+ @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
+ public void onTaskFinished(GridHadoopTaskInfo info, GridHadoopTaskStatus status) {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ assert status.state() != RUNNING;
+
+ if (log.isDebugEnabled())
+ log.debug("Received task finished callback [info=" + info + ", status=" + status + ']');
+
+ JobLocalState state = activeJobs.get(info.jobId());
+
+ // Task CRASHes with null fail cause.
+ assert (status.state() != FAILED) || status.failCause() != null :
+ "Invalid task status [info=" + info + ", status=" + status + ']';
+
+ assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)):
+ "Missing local state for finished task [info=" + info + ", status=" + status + ']';
+
+ StackedProcessor incrCntrs = null;
+
+ if (status.state() == COMPLETED)
+ incrCntrs = new IncrementCountersProcessor(null, status.counters());
+
+ switch (info.type()) {
+ case SETUP: {
+ state.onSetupFinished(info, status, incrCntrs);
+
+ break;
+ }
+
+ case MAP: {
+ state.onMapFinished(info, status, incrCntrs);
+
+ break;
+ }
+
+ case REDUCE: {
+ state.onReduceFinished(info, status, incrCntrs);
+
+ break;
+ }
+
+ case COMBINE: {
+ state.onCombineFinished(info, status, incrCntrs);
+
+ break;
+ }
+
+ case COMMIT:
+ case ABORT: {
+ GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache();
+
+ cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
+ listenAsync(failsLog);
+
+ break;
+ }
+ }
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * @param jobId Job id.
+ * @param c Closure of operation.
+ */
+ private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void> c) {
+ jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog);
+ }
+
+ /**
+ * Callback from task executor called when process is ready to received shuffle messages.
+ *
+ * @param jobId Job ID.
+ * @param reducers Reducers.
+ * @param desc Process descriptor.
+ */
+ public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers,
+ GridHadoopProcessDescriptor desc) {
+ transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
+ }
+
+ /**
+ * Gets all input splits for given hadoop map-reduce plan.
+ *
+ * @param plan Map-reduce plan.
+ * @return Collection of all input splits that should be processed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private Map<GridHadoopInputSplit, Integer> allSplits(GridHadoopMapReducePlan plan) {
+ Map<GridHadoopInputSplit, Integer> res = new HashMap<>();
+
+ int taskNum = 0;
+
+ for (UUID nodeId : plan.mapperNodeIds()) {
+ for (GridHadoopInputSplit split : plan.mappers(nodeId)) {
+ if (res.put(split, taskNum++) != null)
+ throw new IllegalStateException("Split duplicate.");
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Gets all reducers for this job.
+ *
+ * @param plan Map-reduce plan.
+ * @return Collection of reducers.
+ */
+ private Collection<Integer> allReducers(GridHadoopMapReducePlan plan) {
+ Collection<Integer> res = new HashSet<>();
+
+ for (int i = 0; i < plan.reducers(); i++)
+ res.add(i);
+
+ return res;
+ }
+
+ /**
+ * Processes node leave (or fail) event.
+ *
+ * @param evt Discovery event.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private void processNodeLeft(DiscoveryEvent evt) {
+ if (log.isDebugEnabled())
+ log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']');
+
+ // Check only if this node is responsible for job status updates.
+ if (ctx.jobUpdateLeader()) {
+ boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder();
+
+ // Iteration over all local entries is correct since system cache is REPLICATED.
+ for (Object metaObj : jobMetaCache().values()) {
+ GridHadoopJobMetadata meta = (GridHadoopJobMetadata)metaObj;
+
+ GridHadoopJobId jobId = meta.jobId();
+
+ GridHadoopMapReducePlan plan = meta.mapReducePlan();
+
+ GridHadoopJobPhase phase = meta.phase();
+
+ try {
+ if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) {
+ // Failover setup task.
+ GridHadoopJob job = job(jobId, meta.jobInfo());
+
+ Collection<GridHadoopTaskInfo> setupTask = setupTask(jobId);
+
+ assert setupTask != null;
+
+ ctx.taskExecutor().run(job, setupTask);
+ }
+ else if (phase == PHASE_MAP || phase == PHASE_REDUCE) {
+ // Must check all nodes, even that are not event node ID due to
+ // multiple node failure possibility.
+ Collection<GridHadoopInputSplit> cancelSplits = null;
+
+ for (UUID nodeId : plan.mapperNodeIds()) {
+ if (ctx.kernalContext().discovery().node(nodeId) == null) {
+ // Node has left the grid.
+ Collection<GridHadoopInputSplit> mappers = plan.mappers(nodeId);
+
+ if (cancelSplits == null)
+ cancelSplits = new HashSet<>();
+
+ cancelSplits.addAll(mappers);
+ }
+ }
+
+ Collection<Integer> cancelReducers = null;
+
+ for (UUID nodeId : plan.reducerNodeIds()) {
+ if (ctx.kernalContext().discovery().node(nodeId) == null) {
+ // Node has left the grid.
+ int[] reducers = plan.reducers(nodeId);
+
+ if (cancelReducers == null)
+ cancelReducers = new HashSet<>();
+
+ for (int rdc : reducers)
+ cancelReducers.add(rdc);
+ }
+ }
+
+ if (cancelSplits != null || cancelReducers != null)
+ jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException(
+ "One or more nodes participating in map-reduce job execution failed."), cancelSplits,
+ cancelReducers));
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to cancel job: " + meta, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param updated Updated cache entries.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void processJobMetadataUpdates(
+ Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata>> updated)
+ throws IgniteCheckedException {
+ UUID locNodeId = ctx.localNodeId();
+
+ for (CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata> entry : updated) {
+ GridHadoopJobId jobId = entry.getKey();
+ GridHadoopJobMetadata meta = entry.getValue();
+
+ if (meta == null || !ctx.isParticipating(meta))
+ continue;
+
+ if (log.isDebugEnabled())
+ log.debug("Processing job metadata update callback [locNodeId=" + locNodeId +
+ ", meta=" + meta + ']');
+
+ try {
+ ctx.taskExecutor().onJobStateChanged(meta);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to process job state changed callback (will fail the job) " +
+ "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e);
+
+ transform(jobId, new CancelJobProcessor(null, e));
+
+ continue;
+ }
+
+ processJobMetaUpdate(jobId, meta, locNodeId);
+ }
+ }
+
+ /**
+ * @param jobId Job ID.
+ * @param plan Map-reduce plan.
+ */
+ private void printPlan(GridHadoopJobId jobId, GridHadoopMapReducePlan plan) {
+ log.info("Plan for " + jobId);
+
+ SB b = new SB();
+
+ b.a(" Map: ");
+
+ for (UUID nodeId : plan.mapperNodeIds())
+ b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' ');
+
+ log.info(b.toString());
+
+ b = new SB();
+
+ b.a(" Reduce: ");
+
+ for (UUID nodeId : plan.reducerNodeIds())
+ b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' ');
+
+ log.info(b.toString());
+ }
+
+ /**
+ * @param jobId Job ID.
+ * @param meta Job metadata.
+ * @param locNodeId Local node ID.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void processJobMetaUpdate(GridHadoopJobId jobId, GridHadoopJobMetadata meta, UUID locNodeId)
+ throws IgniteCheckedException {
+ JobLocalState state = activeJobs.get(jobId);
+
+ GridHadoopJob job = job(jobId, meta.jobInfo());
+
+ GridHadoopMapReducePlan plan = meta.mapReducePlan();
+
+ switch (meta.phase()) {
+ case PHASE_SETUP: {
+ if (ctx.jobUpdateLeader()) {
+ Collection<GridHadoopTaskInfo> setupTask = setupTask(jobId);
+
+ if (setupTask != null)
+ ctx.taskExecutor().run(job, setupTask);
+ }
+
+ break;
+ }
+
+ case PHASE_MAP: {
+ // Check if we should initiate new task on local node.
+ Collection<GridHadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta);
+
+ if (tasks != null)
+ ctx.taskExecutor().run(job, tasks);
+
+ break;
+ }
+
+ case PHASE_REDUCE: {
+ if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) {
+ GridHadoopTaskInfo info = new GridHadoopTaskInfo(COMMIT, jobId, 0, 0, null);
+
+ if (log.isDebugEnabled())
+ log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId +
+ ", jobId=" + jobId + ']');
+
+ ctx.taskExecutor().run(job, Collections.singletonList(info));
+
+ break;
+ }
+
+ Collection<GridHadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job);
+
+ if (tasks != null)
+ ctx.taskExecutor().run(job, tasks);
+
+ break;
+ }
+
+ case PHASE_CANCELLING: {
+ // Prevent multiple task executor notification.
+ if (state != null && state.onCancel()) {
+ if (log.isDebugEnabled())
+ log.debug("Cancelling local task execution for job: " + meta);
+
+ ctx.taskExecutor().cancelTasks(jobId);
+ }
+
+ if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) {
+ if (ctx.jobUpdateLeader()) {
+ if (state == null)
+ state = initState(jobId);
+
+ // Prevent running multiple abort tasks.
+ if (state.onAborted()) {
+ GridHadoopTaskInfo info = new GridHadoopTaskInfo(ABORT, jobId, 0, 0, null);
+
+ if (log.isDebugEnabled())
+ log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId +
+ ", jobId=" + jobId + ']');
+
+ ctx.taskExecutor().run(job, Collections.singletonList(info));
+ }
+ }
+
+ break;
+ }
+ else {
+ // Check if there are unscheduled mappers or reducers.
+ Collection<GridHadoopInputSplit> cancelMappers = new ArrayList<>();
+ Collection<Integer> cancelReducers = new ArrayList<>();
+
+ Collection<GridHadoopInputSplit> mappers = plan.mappers(ctx.localNodeId());
+
+ if (mappers != null) {
+ for (GridHadoopInputSplit b : mappers) {
+ if (state == null || !state.mapperScheduled(b))
+ cancelMappers.add(b);
+ }
+ }
+
+ int[] rdc = plan.reducers(ctx.localNodeId());
+
+ if (rdc != null) {
+ for (int r : rdc) {
+ if (state == null || !state.reducerScheduled(r))
+ cancelReducers.add(r);
+ }
+ }
+
+ if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty())
+ transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers));
+ }
+
+ break;
+ }
+
+ case PHASE_COMPLETE: {
+ if (log.isDebugEnabled())
+ log.debug("Job execution is complete, will remove local state from active jobs " +
+ "[jobId=" + jobId + ", meta=" + meta + ']');
+
+ if (state != null) {
+ state = activeJobs.remove(jobId);
+
+ assert state != null;
+
+ ctx.shuffle().jobFinished(jobId);
+ }
+
+ GridFutureAdapter<GridHadoopJobId> finishFut = activeFinishFuts.remove(jobId);
+
+ if (finishFut != null) {
+ if (log.isDebugEnabled())
+ log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']');
+
+ finishFut.onDone(jobId, meta.failCause());
+ }
+
+ if (ctx.jobUpdateLeader())
+ job.cleanupStagingDirectory();
+
+ jobs.remove(jobId);
+
+ job.dispose(false);
+
+ if (ctx.jobUpdateLeader()) {
+ ClassLoader ldr = job.getClass().getClassLoader();
+
+ try {
+ String statWriterClsName = job.info().property(HadoopUtils.JOB_COUNTER_WRITER_PROPERTY);
+
+ if (statWriterClsName != null) {
+ Class<?> cls = ldr.loadClass(statWriterClsName);
+
+ GridHadoopCounterWriter writer = (GridHadoopCounterWriter)cls.newInstance();
+
+ GridHadoopCounters cntrs = meta.counters();
+
+ writer.write(job.info(), jobId, cntrs);
+ }
+ }
+ catch (Exception e) {
+ log.error("Can't write statistic due to: ", e);
+ }
+ }
+
+ break;
+ }
+
+ default:
+ throw new IllegalStateException("Unknown phase: " + meta.phase());
+ }
+ }
+
+ /**
+ * Creates setup task based on job information.
+ *
+ * @param jobId Job ID.
+ * @return Setup task wrapped in collection.
+ */
+ @Nullable private Collection<GridHadoopTaskInfo> setupTask(GridHadoopJobId jobId) {
+ if (activeJobs.containsKey(jobId))
+ return null;
+ else {
+ initState(jobId);
+
+ return Collections.singleton(new GridHadoopTaskInfo(SETUP, jobId, 0, 0, null));
+ }
+ }
+
+ /**
+ * Creates mapper tasks based on job information.
+ *
+ * @param mappers Mapper blocks.
+ * @param meta Job metadata.
+ * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node.
+ */
+ private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, GridHadoopJobMetadata meta) {
+ UUID locNodeId = ctx.localNodeId();
+ GridHadoopJobId jobId = meta.jobId();
+
+ JobLocalState state = activeJobs.get(jobId);
+
+ Collection<GridHadoopTaskInfo> tasks = null;
+
+ if (mappers != null) {
+ if (state == null)
+ state = initState(jobId);
+
+ for (GridHadoopInputSplit split : mappers) {
+ if (state.addMapper(split)) {
+ if (log.isDebugEnabled())
+ log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId +
+ ", split=" + split + ']');
+
+ GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split);
+
+ if (tasks == null)
+ tasks = new ArrayList<>();
+
+ tasks.add(taskInfo);
+ }
+ }
+ }
+
+ return tasks;
+ }
+
+ /**
+ * Creates reducer tasks based on job information.
+ *
+ * @param reducers Reducers (may be {@code null}).
+ * @param job Job instance.
+ * @return Collection of task infos.
+ */
+ private Collection<GridHadoopTaskInfo> reducerTasks(int[] reducers, GridHadoopJob job) {
+ UUID locNodeId = ctx.localNodeId();
+ GridHadoopJobId jobId = job.id();
+
+ JobLocalState state = activeJobs.get(jobId);
+
+ Collection<GridHadoopTaskInfo> tasks = null;
+
+ if (reducers != null) {
+ if (state == null)
+ state = initState(job.id());
+
+ for (int rdc : reducers) {
+ if (state.addReducer(rdc)) {
+ if (log.isDebugEnabled())
+ log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId +
+ ", rdc=" + rdc + ']');
+
+ GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(REDUCE, jobId, rdc, 0, null);
+
+ if (tasks == null)
+ tasks = new ArrayList<>();
+
+ tasks.add(taskInfo);
+ }
+ }
+ }
+
+ return tasks;
+ }
+
+ /**
+ * Initializes local state for given job metadata.
+ *
+ * @param jobId Job ID.
+ * @return Local state.
+ */
+ private JobLocalState initState(GridHadoopJobId jobId) {
+ return F.addIfAbsent(activeJobs, jobId, new JobLocalState());
+ }
+
+ /**
+ * Gets or creates job instance.
+ *
+ * @param jobId Job ID.
+ * @param jobInfo Job info.
+ * @return Job.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public GridHadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException {
+ GridFutureAdapterEx<GridHadoopJob> fut = jobs.get(jobId);
+
+ if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<GridHadoopJob>())) != null)
+ return fut.get();
+
+ fut = jobs.get(jobId);
+
+ GridHadoopJob job = null;
+
+ try {
+ if (jobInfo == null) {
+ GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+ if (meta == null)
+ throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId);
+
+ jobInfo = meta.jobInfo();
+ }
+
+ job = jobInfo.createJob(jobId, log);
+
+ job.initialize(false, ctx.localNodeId());
+
+ fut.onDone(job);
+
+ return job;
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+
+ jobs.remove(jobId, fut);
+
+ if (job != null) {
+ try {
+ job.dispose(false);
+ }
+ catch (IgniteCheckedException e0) {
+ U.error(log, "Failed to dispose job: " + jobId, e0);
+ }
+ }
+
+ throw e;
+ }
+ }
+
+ /**
+ * Kills job.
+ *
+ * @param jobId Job ID.
+ * @return {@code True} if job was killed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean killJob(GridHadoopJobId jobId) throws IgniteCheckedException {
+ if (!busyLock.tryReadLock())
+ return false; // Grid is stopping.
+
+ try {
+ GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+ if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
+ HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled.");
+
+ jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err));
+ }
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+
+ IgniteInternalFuture<?> fut = finishFuture(jobId);
+
+ if (fut != null) {
+ try {
+ fut.get();
+ }
+ catch (Throwable e) {
+ if (e.getCause() instanceof HadoopTaskCancelledException)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns job counters.
+ *
+ * @param jobId Job identifier.
+ * @return Job counters or {@code null} if job cannot be found.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public GridHadoopCounters jobCounters(GridHadoopJobId jobId) throws IgniteCheckedException {
+ if (!busyLock.tryReadLock())
+ return null;
+
+ try {
+ final GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+
+ return meta != null ? meta.counters() : null;
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * Event handler protected by busy lock.
+ */
+ private abstract class EventHandler implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ if (!busyLock.tryReadLock())
+ return;
+
+ try {
+ body();
+ }
+ catch (Throwable e) {
+ U.error(log, "Unhandled exception while processing event.", e);
+ }
+ finally {
+ busyLock.readUnlock();
+ }
+ }
+
+ /**
+ * Handler body.
+ */
+ protected abstract void body() throws Exception;
+ }
+
+ /**
+ *
+ */
+ private class JobLocalState {
+ /** Mappers. */
+ private final Collection<GridHadoopInputSplit> currMappers = new HashSet<>();
+
+ /** Reducers. */
+ private final Collection<Integer> currReducers = new HashSet<>();
+
+ /** Number of completed mappers. */
+ private final AtomicInteger completedMappersCnt = new AtomicInteger();
+
+ /** Cancelled flag. */
+ private boolean cancelled;
+
+ /** Aborted flag. */
+ private boolean aborted;
+
+ /**
+ * @param mapSplit Map split to add.
+ * @return {@code True} if mapper was added.
+ */
+ private boolean addMapper(GridHadoopInputSplit mapSplit) {
+ return currMappers.add(mapSplit);
+ }
+
+ /**
+ * @param rdc Reducer number to add.
+ * @return {@code True} if reducer was added.
+ */
+ private boolean addReducer(int rdc) {
+ return currReducers.add(rdc);
+ }
+
+ /**
+ * Checks whether this split was scheduled for given attempt.
+ *
+ * @param mapSplit Map split to check.
+ * @return {@code True} if mapper was scheduled.
+ */
+ public boolean mapperScheduled(GridHadoopInputSplit mapSplit) {
+ return currMappers.contains(mapSplit);
+ }
+
+ /**
+ * Checks whether this split was scheduled for given attempt.
+ *
+ * @param rdc Reducer number to check.
+ * @return {@code True} if reducer was scheduled.
+ */
+ public boolean reducerScheduled(int rdc) {
+ return currReducers.contains(rdc);
+ }
+
+ /**
+ * @param taskInfo Task info.
+ * @param status Task status.
+ * @param prev Previous closure.
+ */
+ private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
+ final GridHadoopJobId jobId = taskInfo.jobId();
+
+ if (status.state() == FAILED || status.state() == CRASHED)
+ transform(jobId, new CancelJobProcessor(prev, status.failCause()));
+ else
+ transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP));
+ }
+
+ /**
+ * @param taskInfo Task info.
+ * @param status Task status.
+ * @param prev Previous closure.
+ */
+ private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
+ final StackedProcessor prev) {
+ final GridHadoopJobId jobId = taskInfo.jobId();
+
+ boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size();
+
+ if (status.state() == FAILED || status.state() == CRASHED) {
+ // Fail the whole job.
+ transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause()));
+
+ return;
+ }
+
+ IgniteInClosure<IgniteInternalFuture<?>> cacheUpdater = new CIX1<IgniteInternalFuture<?>>() {
+ @Override public void applyx(IgniteInternalFuture<?> f) {
+ Throwable err = null;
+
+ if (f != null) {
+ try {
+ f.get();
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+ }
+ }
+
+ transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err));
+ }
+ };
+
+ if (lastMapperFinished)
+ ctx.shuffle().flush(jobId).listenAsync(cacheUpdater);
+ else
+ cacheUpdater.apply(null);
+ }
+
+ /**
+ * @param taskInfo Task info.
+ * @param status Task status.
+ * @param prev Previous closure.
+ */
+ private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
+ GridHadoopJobId jobId = taskInfo.jobId();
+ if (status.state() == FAILED || status.state() == CRASHED)
+ // Fail the whole job.
+ transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause()));
+ else
+ transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber()));
+ }
+
+ /**
+ * @param taskInfo Task info.
+ * @param status Task status.
+ * @param prev Previous closure.
+ */
+ private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
+ final StackedProcessor prev) {
+ final GridHadoopJobId jobId = taskInfo.jobId();
+
+ if (status.state() == FAILED || status.state() == CRASHED)
+ // Fail the whole job.
+ transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause()));
+ else {
+ ctx.shuffle().flush(jobId).listenAsync(new CIX1<IgniteInternalFuture<?>>() {
+ @Override public void applyx(IgniteInternalFuture<?> f) {
+ Throwable err = null;
+
+ if (f != null) {
+ try {
+ f.get();
+ }
+ catch (IgniteCheckedException e) {
+ err = e;
+ }
+ }
+
+ transform(jobId, new RemoveMappersProcessor(prev, currMappers, err));
+ }
+ });
+ }
+ }
+
+ /**
+ * @return {@code True} if job was cancelled by this (first) call.
+ */
+ public boolean onCancel() {
+ if (!cancelled && !aborted) {
+ cancelled = true;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @return {@code True} if job was aborted this (first) call.
+ */
+ public boolean onAborted() {
+ if (!aborted) {
+ aborted = true;
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * Update job phase transform closure.
+ */
+ private static class UpdatePhaseProcessor extends StackedProcessor {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Phase to update. */
+ private final GridHadoopJobPhase phase;
+
+ /**
+ * @param prev Previous closure.
+ * @param phase Phase to update.
+ */
+ private UpdatePhaseProcessor(@Nullable StackedProcessor prev, GridHadoopJobPhase phase) {
+ super(prev);
+
+ this.phase = phase;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ cp.phase(phase);
+ }
+ }
+
+ /**
+ * Remove mapper transform closure.
+ */
+ private static class RemoveMappersProcessor extends StackedProcessor {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mapper split to remove. */
+ private final Collection<GridHadoopInputSplit> splits;
+
+ /** Error. */
+ private final Throwable err;
+
+ /**
+ * @param prev Previous closure.
+ * @param split Mapper split to remove.
+ * @param err Error.
+ */
+ private RemoveMappersProcessor(@Nullable StackedProcessor prev, GridHadoopInputSplit split, Throwable err) {
+ this(prev, Collections.singletonList(split), err);
+ }
+
+ /**
+ * @param prev Previous closure.
+ * @param splits Mapper splits to remove.
+ * @param err Error.
+ */
+ private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<GridHadoopInputSplit> splits,
+ Throwable err) {
+ super(prev);
+
+ this.splits = splits;
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
+
+ for (GridHadoopInputSplit s : splits)
+ splitsCp.remove(s);
+
+ cp.pendingSplits(splitsCp);
+
+ if (cp.phase() != PHASE_CANCELLING && err != null)
+ cp.failCause(err);
+
+ if (err != null)
+ cp.phase(PHASE_CANCELLING);
+
+ if (splitsCp.isEmpty()) {
+ if (cp.phase() != PHASE_CANCELLING)
+ cp.phase(PHASE_REDUCE);
+ }
+ }
+ }
+
+ /**
+ * Remove reducer transform closure.
+ */
+ private static class RemoveReducerProcessor extends StackedProcessor {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mapper split to remove. */
+ private final int rdc;
+
+ /** Error. */
+ private Throwable err;
+
+ /**
+ * @param prev Previous closure.
+ * @param rdc Reducer to remove.
+ */
+ private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) {
+ super(prev);
+
+ this.rdc = rdc;
+ }
+
+ /**
+ * @param prev Previous closure.
+ * @param rdc Reducer to remove.
+ * @param err Error.
+ */
+ private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) {
+ super(prev);
+
+ this.rdc = rdc;
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
+
+ rdcCp.remove(rdc);
+
+ cp.pendingReducers(rdcCp);
+
+ if (err != null) {
+ cp.phase(PHASE_CANCELLING);
+ cp.failCause(err);
+ }
+ }
+ }
+
+ /**
+ * Initialize reducers.
+ */
+ private static class InitializeReducersProcessor extends StackedProcessor {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Reducers. */
+ private final Collection<Integer> rdc;
+
+ /** Process descriptor for reducers. */
+ private final GridHadoopProcessDescriptor desc;
+
+ /**
+ * @param prev Previous closure.
+ * @param rdc Reducers to initialize.
+ * @param desc External process descriptor.
+ */
+ private InitializeReducersProcessor(@Nullable StackedProcessor prev,
+ Collection<Integer> rdc,
+ GridHadoopProcessDescriptor desc) {
+ super(prev);
+
+ assert !F.isEmpty(rdc);
+ assert desc != null;
+
+ this.rdc = rdc;
+ this.desc = desc;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ Map<Integer, GridHadoopProcessDescriptor> oldMap = meta.reducersAddresses();
+
+ Map<Integer, GridHadoopProcessDescriptor> rdcMap = oldMap == null ?
+ new HashMap<Integer, GridHadoopProcessDescriptor>() : new HashMap<>(oldMap);
+
+ for (Integer r : rdc)
+ rdcMap.put(r, desc);
+
+ cp.reducersAddresses(rdcMap);
+ }
+ }
+
+ /**
+ * Remove reducer transform closure.
+ */
+ private static class CancelJobProcessor extends StackedProcessor {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Mapper split to remove. */
+ private final Collection<GridHadoopInputSplit> splits;
+
+ /** Reducers to remove. */
+ private final Collection<Integer> rdc;
+
+ /** Error. */
+ private final Throwable err;
+
+ /**
+ * @param prev Previous closure.
+ * @param err Fail cause.
+ */
+ private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) {
+ this(prev, err, null, null);
+ }
+
+ /**
+ * @param prev Previous closure.
+ * @param splits Splits to remove.
+ * @param rdc Reducers to remove.
+ */
+ private CancelJobProcessor(@Nullable StackedProcessor prev,
+ Collection<GridHadoopInputSplit> splits,
+ Collection<Integer> rdc) {
+ this(prev, null, splits, rdc);
+ }
+
+ /**
+ * @param prev Previous closure.
+ * @param err Error.
+ * @param splits Splits to remove.
+ * @param rdc Reducers to remove.
+ */
+ private CancelJobProcessor(@Nullable StackedProcessor prev,
+ Throwable err,
+ Collection<GridHadoopInputSplit> splits,
+ Collection<Integer> rdc) {
+ super(prev);
+
+ this.splits = splits;
+ this.rdc = rdc;
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ assert meta.phase() == PHASE_CANCELLING || err != null: "Invalid phase for cancel: " + meta;
+
+ Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
+
+ if (rdc != null)
+ rdcCp.removeAll(rdc);
+
+ cp.pendingReducers(rdcCp);
+
+ Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
+
+ if (splits != null) {
+ for (GridHadoopInputSplit s : splits)
+ splitsCp.remove(s);
+ }
+
+ cp.pendingSplits(splitsCp);
+
+ cp.phase(PHASE_CANCELLING);
+
+ if (err != null)
+ cp.failCause(err);
+ }
+ }
+
+ /**
+ * Increment counter values closure.
+ */
+ private static class IncrementCountersProcessor extends StackedProcessor {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final GridHadoopCounters counters;
+
+ /**
+ * @param prev Previous closure.
+ * @param counters Task counters to add into job counters.
+ */
+ private IncrementCountersProcessor(@Nullable StackedProcessor prev, GridHadoopCounters counters) {
+ super(prev);
+
+ assert counters != null;
+
+ this.counters = counters;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+ GridHadoopCounters cntrs = new GridHadoopCountersImpl(cp.counters());
+
+ cntrs.merge(counters);
+
+ cp.counters(cntrs);
+ }
+ }
+
+ /**
+ * Abstract stacked closure.
+ */
+ private abstract static class StackedProcessor implements
+ EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void>, Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final StackedProcessor prev;
+
+ /**
+ * @param prev Previous closure.
+ */
+ private StackedProcessor(@Nullable StackedProcessor prev) {
+ this.prev = prev;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<GridHadoopJobId, GridHadoopJobMetadata> e, Object... args) {
+ GridHadoopJobMetadata val = apply(e.getValue());
+
+ if (val != null)
+ e.setValue(val);
+ else
+ e.remove();;
+
+ return null;
+ }
+
+ /**
+ * @param meta Old value.
+ * @return New value.
+ */
+ private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) {
+ if (meta == null)
+ return null;
+
+ GridHadoopJobMetadata cp = prev != null ? prev.apply(meta) : new GridHadoopJobMetadata(meta);
+
+ update(meta, cp);
+
+ return cp;
+ }
+
+ /**
+ * Update given job metadata object.
+ *
+ * @param meta Initial job metadata.
+ * @param cp Copy.
+ */
+ protected abstract void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
index c734acd..8fdab9d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
@@ -37,7 +37,7 @@ public class GridHadoopProtocolSubmitJobTask extends GridHadoopProtocolTaskAdapt
GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
UUID nodeId = UUID.fromString(args.<String>get(0));
Integer id = args.get(1);
- GridHadoopDefaultJobInfo info = args.get(2);
+ HadoopDefaultJobInfo info = args.get(2);
assert nodeId != null;
assert id != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
index 4c83ace..66fb230 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
/**
* Hadoop client protocol.
@@ -238,7 +238,7 @@ public class HadoopClientProtocol implements ClientProtocol {
@Override public String getStagingAreaDir() throws IOException, InterruptedException {
String usr = UserGroupInformation.getCurrentUser().getShortUserName();
- return GridHadoopUtils.stagingAreaDir(conf, usr).toString();
+ return HadoopUtils.stagingAreaDir(conf, usr).toString();
}
/** {@inheritDoc} */
@@ -327,6 +327,6 @@ public class HadoopClientProtocol implements ClientProtocol {
else
assert lastStatus != null;
- return GridHadoopUtils.status(lastStatus, conf);
+ return HadoopUtils.status(lastStatus, conf);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java
deleted file mode 100644
index 396124e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Shuffle.
- */
-public class GridHadoopShuffle extends GridHadoopComponent {
- /** */
- private final ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>();
-
- /** */
- protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
- /** {@inheritDoc} */
- @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException {
- super.start(ctx);
-
- ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
- new IgniteBiPredicate<UUID, Object>() {
- @Override public boolean apply(UUID nodeId, Object msg) {
- return onMessageReceived(nodeId, (GridHadoopMessage)msg);
- }
- });
- }
-
- /**
- * Stops shuffle.
- *
- * @param cancel If should cancel all ongoing activities.
- */
- @Override public void stop(boolean cancel) {
- for (GridHadoopShuffleJob job : jobs.values()) {
- try {
- job.close();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to close job.", e);
- }
- }
-
- jobs.clear();
- }
-
- /**
- * Creates new shuffle job.
- *
- * @param jobId Job ID.
- * @return Created shuffle job.
- * @throws IgniteCheckedException If job creation failed.
- */
- private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException {
- GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
-
- GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<>(ctx.localNodeId(), log,
- ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
-
- UUID[] rdcAddrs = new UUID[plan.reducers()];
-
- for (int i = 0; i < rdcAddrs.length; i++) {
- UUID nodeId = plan.nodeForReducer(i);
-
- assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
-
- rdcAddrs[i] = nodeId;
- }
-
- boolean init = job.initializeReduceAddresses(rdcAddrs);
-
- assert init;
-
- return job;
- }
-
- /**
- * @param nodeId Node ID to send message to.
- * @param msg Message to send.
- * @throws IgniteCheckedException If send failed.
- */
- private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
- ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
-
- ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
- }
-
- /**
- * @param jobId Task info.
- * @return Shuffle job.
- */
- private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException {
- GridHadoopShuffleJob<UUID> res = jobs.get(jobId);
-
- if (res == null) {
- res = newJob(jobId);
-
- GridHadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
-
- if (old != null) {
- res.close();
-
- res = old;
- }
- else if (res.reducersInitialized())
- startSending(res);
- }
-
- return res;
- }
-
- /**
- * Starts message sending thread.
- *
- * @param shuffleJob Job to start sending for.
- */
- private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) {
- shuffleJob.startSending(ctx.kernalContext().gridName(),
- new IgniteInClosure2X<UUID, GridHadoopShuffleMessage>() {
- @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws IgniteCheckedException {
- send0(dest, msg);
- }
- }
- );
- }
-
- /**
- * Message received callback.
- *
- * @param src Sender node ID.
- * @param msg Received message.
- * @return {@code True}.
- */
- public boolean onMessageReceived(UUID src, GridHadoopMessage msg) {
- if (msg instanceof GridHadoopShuffleMessage) {
- GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg;
-
- try {
- job(m.jobId()).onShuffleMessage(m);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Message handling failed.", e);
- }
-
- try {
- // Reply with ack.
- send0(src, new GridHadoopShuffleAck(m.id(), m.jobId()));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e);
- }
- }
- else if (msg instanceof GridHadoopShuffleAck) {
- GridHadoopShuffleAck m = (GridHadoopShuffleAck)msg;
-
- try {
- job(m.jobId()).onShuffleAck(m);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Message handling failed.", e);
- }
- }
- else
- throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
- ", msg=" + msg + ']');
-
- return true;
- }
-
- /**
- * @param taskCtx Task info.
- * @return Output.
- */
- public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- return job(taskCtx.taskInfo().jobId()).output(taskCtx);
- }
-
- /**
- * @param taskCtx Task info.
- * @return Input.
- */
- public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- return job(taskCtx.taskInfo().jobId()).input(taskCtx);
- }
-
- /**
- * @param jobId Job id.
- */
- public void jobFinished(GridHadoopJobId jobId) {
- GridHadoopShuffleJob job = jobs.remove(jobId);
-
- if (job != null) {
- try {
- job.close();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to close job: " + jobId, e);
- }
- }
- }
-
- /**
- * Flushes all the outputs for the given job to remote nodes.
- *
- * @param jobId Job ID.
- * @return Future.
- */
- public IgniteInternalFuture<?> flush(GridHadoopJobId jobId) {
- GridHadoopShuffleJob job = jobs.get(jobId);
-
- if (job == null)
- return new GridFinishedFutureEx<>();
-
- try {
- return job.flush();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFutureEx<>(e);
- }
- }
-
- /**
- * @return Memory.
- */
- public GridUnsafeMemory memory() {
- return mem;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
new file mode 100644
index 0000000..9880093
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Shuffle.
+ */
+public class HadoopShuffle extends HadoopComponent {
+ /** */
+ private final ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>();
+
+ /** */
+ protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ /** {@inheritDoc} */
+ @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+ super.start(ctx);
+
+ ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
+ new IgniteBiPredicate<UUID, Object>() {
+ @Override public boolean apply(UUID nodeId, Object msg) {
+ return onMessageReceived(nodeId, (GridHadoopMessage)msg);
+ }
+ });
+ }
+
+ /**
+ * Stops shuffle.
+ *
+ * @param cancel If should cancel all ongoing activities.
+ */
+ @Override public void stop(boolean cancel) {
+ for (GridHadoopShuffleJob job : jobs.values()) {
+ try {
+ job.close();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close job.", e);
+ }
+ }
+
+ jobs.clear();
+ }
+
+ /**
+ * Creates new shuffle job.
+ *
+ * @param jobId Job ID.
+ * @return Created shuffle job.
+ * @throws IgniteCheckedException If job creation failed.
+ */
+ private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException {
+ GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
+
+ GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<>(ctx.localNodeId(), log,
+ ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
+
+ UUID[] rdcAddrs = new UUID[plan.reducers()];
+
+ for (int i = 0; i < rdcAddrs.length; i++) {
+ UUID nodeId = plan.nodeForReducer(i);
+
+ assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
+
+ rdcAddrs[i] = nodeId;
+ }
+
+ boolean init = job.initializeReduceAddresses(rdcAddrs);
+
+ assert init;
+
+ return job;
+ }
+
+ /**
+ * @param nodeId Node ID to send message to.
+ * @param msg Message to send.
+ * @throws IgniteCheckedException If send failed.
+ */
+ private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
+ ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
+
+ ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
+ }
+
+ /**
+ * @param jobId Task info.
+ * @return Shuffle job.
+ */
+ private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException {
+ GridHadoopShuffleJob<UUID> res = jobs.get(jobId);
+
+ if (res == null) {
+ res = newJob(jobId);
+
+ GridHadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
+
+ if (old != null) {
+ res.close();
+
+ res = old;
+ }
+ else if (res.reducersInitialized())
+ startSending(res);
+ }
+
+ return res;
+ }
+
+ /**
+ * Starts message sending thread.
+ *
+ * @param shuffleJob Job to start sending for.
+ */
+ private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) {
+ shuffleJob.startSending(ctx.kernalContext().gridName(),
+ new IgniteInClosure2X<UUID, GridHadoopShuffleMessage>() {
+ @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws IgniteCheckedException {
+ send0(dest, msg);
+ }
+ }
+ );
+ }
+
+ /**
+ * Message received callback.
+ *
+ * @param src Sender node ID.
+ * @param msg Received message.
+ * @return {@code True}.
+ */
+ public boolean onMessageReceived(UUID src, GridHadoopMessage msg) {
+ if (msg instanceof GridHadoopShuffleMessage) {
+ GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg;
+
+ try {
+ job(m.jobId()).onShuffleMessage(m);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Message handling failed.", e);
+ }
+
+ try {
+ // Reply with ack.
+ send0(src, new GridHadoopShuffleAck(m.id(), m.jobId()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e);
+ }
+ }
+ else if (msg instanceof GridHadoopShuffleAck) {
+ GridHadoopShuffleAck m = (GridHadoopShuffleAck)msg;
+
+ try {
+ job(m.jobId()).onShuffleAck(m);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Message handling failed.", e);
+ }
+ }
+ else
+ throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
+ ", msg=" + msg + ']');
+
+ return true;
+ }
+
+ /**
+ * @param taskCtx Task info.
+ * @return Output.
+ */
+ public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return job(taskCtx.taskInfo().jobId()).output(taskCtx);
+ }
+
+ /**
+ * @param taskCtx Task info.
+ * @return Input.
+ */
+ public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return job(taskCtx.taskInfo().jobId()).input(taskCtx);
+ }
+
+ /**
+ * @param jobId Job id.
+ */
+ public void jobFinished(GridHadoopJobId jobId) {
+ GridHadoopShuffleJob job = jobs.remove(jobId);
+
+ if (job != null) {
+ try {
+ job.close();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close job: " + jobId, e);
+ }
+ }
+ }
+
+ /**
+ * Flushes all the outputs for the given job to remote nodes.
+ *
+ * @param jobId Job ID.
+ * @return Future.
+ */
+ public IgniteInternalFuture<?> flush(GridHadoopJobId jobId) {
+ GridHadoopShuffleJob job = jobs.get(jobId);
+
+ if (job == null)
+ return new GridFinishedFutureEx<>();
+
+ try {
+ return job.flush();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFutureEx<>(e);
+ }
+ }
+
+ /**
+ * @return Memory.
+ */
+ public GridUnsafeMemory memory() {
+ return mem;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java
deleted file mode 100644
index fde5400..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-
-/**
- * Task executor.
- */
-public class GridHadoopEmbeddedTaskExecutor extends GridHadoopTaskExecutorAdapter {
- /** Job tracker. */
- private GridHadoopJobTracker jobTracker;
-
- /** */
- private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
-
- /** Executor service to run tasks. */
- private GridHadoopExecutorService exec;
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- jobTracker = ctx.jobTracker();
-
- exec = new GridHadoopExecutorService(log, ctx.kernalContext().gridName(),
- ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- if (exec != null) {
- exec.shutdown(3000);
-
- if (cancel) {
- for (GridHadoopJobId jobId : jobs.keySet())
- cancelTasks(jobId);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) {
- if (exec != null && !exec.shutdown(30000))
- U.warn(log, "Failed to finish running tasks in 30 sec.");
- }
-
- /** {@inheritDoc} */
- @Override public void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
- ", tasksCnt=" + tasks.size() + ']');
-
- Collection<GridHadoopRunnableTask> executedTasks = jobs.get(job.id());
-
- if (executedTasks == null) {
- executedTasks = new GridConcurrentHashSet<>();
-
- Collection<GridHadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
-
- assert extractedCol == null;
- }
-
- final Collection<GridHadoopRunnableTask> finalExecutedTasks = executedTasks;
-
- for (final GridHadoopTaskInfo info : tasks) {
- assert info != null;
-
- GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
- ctx.localNodeId()) {
- @Override protected void onTaskFinished(GridHadoopTaskStatus status) {
- if (log.isDebugEnabled())
- log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
- "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
-
- finalExecutedTasks.remove(this);
-
- jobTracker.onTaskFinished(info, status);
- }
-
- @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- return ctx.shuffle().input(taskCtx);
- }
-
- @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- return ctx.shuffle().output(taskCtx);
- }
- };
-
- executedTasks.add(task);
-
- exec.submit(task);
- }
- }
-
- /**
- * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
- * for this job ID.
- * <p>
- * It is guaranteed that this method will not be called concurrently with
- * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
- * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
- *
- * @param jobId Job ID to cancel.
- */
- @Override public void cancelTasks(GridHadoopJobId jobId) {
- Collection<GridHadoopRunnableTask> executedTasks = jobs.get(jobId);
-
- if (executedTasks != null) {
- for (GridHadoopRunnableTask task : executedTasks)
- task.cancel();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException {
- if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
- Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
-
- assert executedTasks == null || executedTasks.isEmpty();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
index fd4a030..0d49be9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
@@ -130,7 +130,7 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> {
}
}
}
- catch (GridHadoopTaskCancelledException ignored) {
+ catch (HadoopTaskCancelledException ignored) {
state = GridHadoopTaskState.CANCELED;
}
catch (Throwable e) {
@@ -163,7 +163,7 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> {
*/
private void runTask(GridHadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
+ throw new HadoopTaskCancelledException("Task cancelled.");
try (GridHadoopTaskOutput out = createOutputInternal(ctx);
GridHadoopTaskInput in = createInputInternal(ctx)) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java
deleted file mode 100644
index 8f66190..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-
-import java.util.*;
-
-/**
- * Common superclass for task executor.
- */
-public abstract class GridHadoopTaskExecutorAdapter extends GridHadoopComponent {
- /**
- * Runs tasks.
- *
- * @param job Job.
- * @param tasks Tasks.
- * @throws IgniteCheckedException If failed.
- */
- public abstract void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException;
-
- /**
- * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
- * for this job ID.
- * <p>
- * It is guaranteed that this method will not be called concurrently with
- * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
- * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
- *
- * @param jobId Job ID to cancel.
- */
- public abstract void cancelTasks(GridHadoopJobId jobId) throws IgniteCheckedException;
-
- /**
- * On job state change callback;
- *
- * @param meta Job metadata.
- */
- public abstract void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException;
-}