You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/22 07:01:21 UTC

[41/51] [partial] ignite git commit: IGNITE-3949: Applied new HadoopClassLoader architecture.

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
new file mode 100644
index 0000000..4c030b4
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Partitioner;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1ReduceTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1SetupTask;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.unwrapSplit;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.FsCacheKey;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+
+/**
+ * Context for task execution.
+ */
+public class HadoopV2TaskContext extends HadoopTaskContext {
+    /** */
+    private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
+
+    /** Lazy per-user file system cache used by the Hadoop task. */
+    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
+        = createHadoopLazyConcurrentMap();
+
+    /**
+     * This method is called with reflection upon Job finish with class loader of each task.
+     * This will clean up all the Fs created for specific task.
+     * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders
+     * are different.
+     *
+     * @throws IgniteCheckedException On error.
+     */
+    public static void close() throws IgniteCheckedException {
+        fsMap.close();
+    }
+
+    /**
+     * Check for combiner grouping support (available since Hadoop 2.3).
+     */
+    static {
+        boolean ok;
+
+        try {
+            JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
+
+            ok = true;
+        }
+        catch (NoSuchMethodException ignore) {
+            ok = false;
+        }
+
+        COMBINE_KEY_GROUPING_SUPPORTED = ok;
+    }
+
+    /** Flag is set if new context-object code is used for running the mapper. */
+    private final boolean useNewMapper;
+
+    /** Flag is set if new context-object code is used for running the reducer. */
+    private final boolean useNewReducer;
+
+    /** Flag is set if new context-object code is used for running the combiner. */
+    private final boolean useNewCombiner;
+
+    /** */
+    private final JobContextImpl jobCtx;
+
+    /** Set if task is to cancelling. */
+    private volatile boolean cancelled;
+
+    /** Current task. */
+    private volatile HadoopTask task;
+
+    /** Local node ID */
+    private final UUID locNodeId;
+
+    /** Counters for task. */
+    private final HadoopCounters cntrs = new HadoopCountersImpl();
+
+    /**
+     * @param taskInfo Task info.
+     * @param job Job.
+     * @param jobId Job ID.
+     * @param locNodeId Local node ID.
+     * @param jobConfDataInput DataInput for read JobConf.
+     */
+    public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId,
+        @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
+        super(taskInfo, job);
+        this.locNodeId = locNodeId;
+
+        // Before create JobConf instance we should set new context class loader.
+        ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+        try {
+            JobConf jobConf = new JobConf();
+
+            try {
+                jobConf.readFields(jobConfDataInput);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException(e);
+            }
+
+            // For map-reduce jobs prefer local writes.
+            jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
+
+            jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
+
+            useNewMapper = jobConf.getUseNewMapper();
+            useNewReducer = jobConf.getUseNewReducer();
+            useNewCombiner = jobConf.getCombinerClass() == null;
+        }
+        finally {
+            HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+        return cntrs.counter(grp, name, cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopCounters counters() {
+        return cntrs;
+    }
+
+    /**
+     * Creates appropriate task from current task info.
+     *
+     * @return Task.
+     */
+    private HadoopTask createTask() {
+        boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
+
+        switch (taskInfo().type()) {
+            case SETUP:
+                return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo());
+
+            case MAP:
+                return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo());
+
+            case REDUCE:
+                return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) :
+                    new HadoopV1ReduceTask(taskInfo(), true);
+
+            case COMBINE:
+                return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) :
+                    new HadoopV1ReduceTask(taskInfo(), false);
+
+            case COMMIT:
+            case ABORT:
+                return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) :
+                    new HadoopV1CleanupTask(taskInfo(), isAbort);
+
+            default:
+                return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() throws IgniteCheckedException {
+        ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
+
+        try {
+            try {
+                task = createTask();
+            }
+            catch (Throwable e) {
+                if (e instanceof Error)
+                    throw e;
+
+                throw transformException(e);
+            }
+
+            if (cancelled)
+                throw new HadoopTaskCancelledException("Task cancelled.");
+
+            try {
+                task.run(this);
+            }
+            catch (Throwable e) {
+                if (e instanceof Error)
+                    throw e;
+
+                throw transformException(e);
+            }
+        }
+        finally {
+            task = null;
+
+            HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        cancelled = true;
+
+        HadoopTask t = task;
+
+        if (t != null)
+            t.cancel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+        File locDir;
+
+        switch(taskInfo().type()) {
+            case MAP:
+            case REDUCE:
+                job().prepareTaskEnvironment(taskInfo());
+
+                locDir = taskLocalDir(locNodeId, taskInfo());
+
+                break;
+
+            default:
+                locDir = jobLocalDir(locNodeId, taskInfo().jobId());
+        }
+
+        ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
+
+        try {
+            FileSystem.get(jobConf());
+
+            LocalFileSystem locFs = FileSystem.getLocal(jobConf());
+
+            locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
+        }
+        catch (Throwable e) {
+            if (e instanceof Error)
+                throw (Error)e;
+
+            throw transformException(e);
+        }
+        finally {
+            HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+        job().cleanupTaskEnvironment(taskInfo());
+    }
+
+    /**
+     * Creates Hadoop attempt ID.
+     *
+     * @return Attempt ID.
+     */
+    public TaskAttemptID attemptId() {
+        TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
+
+        return new TaskAttemptID(tid, taskInfo().attempt());
+    }
+
+    /**
+     * @param type Task type.
+     * @return Hadoop task type.
+     */
+    private TaskType taskType(HadoopTaskType type) {
+        switch (type) {
+            case SETUP:
+                return TaskType.JOB_SETUP;
+            case MAP:
+            case COMBINE:
+                return TaskType.MAP;
+
+            case REDUCE:
+                return TaskType.REDUCE;
+
+            case COMMIT:
+            case ABORT:
+                return TaskType.JOB_CLEANUP;
+
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Gets job configuration of the task.
+     *
+     * @return Job configuration.
+     */
+    public JobConf jobConf() {
+        return jobCtx.getJobConf();
+    }
+
+    /**
+     * Gets job context of the task.
+     *
+     * @return Job context.
+     */
+    public JobContextImpl jobContext() {
+        return jobCtx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
+        Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
+
+        if (partClsOld != null)
+            return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
+
+        try {
+            return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
+        }
+        catch (ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Gets serializer for specified class.
+     *
+     * @param cls Class.
+     * @param jobConf Job configuration.
+     * @return Appropriate serializer.
+     */
+    @SuppressWarnings("unchecked")
+    private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
+        A.notNull(cls, "cls");
+
+        SerializationFactory factory = new SerializationFactory(jobConf);
+
+        Serialization<?> serialization = factory.getSerialization(cls);
+
+        if (serialization == null)
+            throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
+
+        if (serialization.getClass() == WritableSerialization.class)
+            return new HadoopWritableSerialization((Class<? extends Writable>)cls);
+
+        return new HadoopSerializationWrapper(serialization, cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
+        return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
+        return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Comparator<Object> sortComparator() {
+        return (Comparator<Object>)jobCtx.getSortComparator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Comparator<Object> groupComparator() {
+        Comparator<?> res;
+
+        switch (taskInfo().type()) {
+            case COMBINE:
+                res = COMBINE_KEY_GROUPING_SUPPORTED ?
+                    jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
+
+                break;
+
+            case REDUCE:
+                res = jobContext().getGroupingComparator();
+
+                break;
+
+            default:
+                return null;
+        }
+
+        if (res != null && res.getClass() != sortComparator().getClass())
+            return (Comparator<Object>)res;
+
+        return null;
+    }
+
+    /**
+     * @param split Split.
+     * @return Native Hadoop split.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException {
+        if (split instanceof HadoopExternalSplit)
+            return readExternalSplit((HadoopExternalSplit)split);
+
+        if (split instanceof HadoopSplitWrapper)
+            return unwrapSplit((HadoopSplitWrapper)split);
+
+        throw new IllegalStateException("Unknown split: " + split);
+    }
+
+    /**
+     * @param split External split.
+     * @return Native input split.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
+        Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
+
+        FileSystem fs;
+
+        try {
+            // This assertion uses .startsWith() instead of .equals() because task class loaders may
+            // be reused between tasks of the same job.
+            assert ((HadoopClassLoader)getClass().getClassLoader()).name()
+                .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
+
+            // We also cache Fs there, all them will be cleared explicitly upon the Job end.
+            fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        try (
+            FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
+
+            in.seek(split.offset());
+
+            String clsName = Text.readString(in);
+
+            Class<?> cls = jobConf().getClassByName(clsName);
+
+            assert cls != null;
+
+            Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
+
+            Deserializer deserializer = serialization.getDeserializer(cls);
+
+            deserializer.open(in);
+
+            Object res = deserializer.deserialize(null);
+
+            deserializer.close();
+
+            assert res != null;
+
+            return res;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+        String user = job.info().user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        String ugiUser;
+
+        try {
+            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+            assert currUser != null;
+
+            ugiUser = currUser.getShortUserName();
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        try {
+            if (F.eq(user, ugiUser))
+                // if current UGI context user is the same, do direct call:
+                return c.call();
+            else {
+                UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return c.call();
+                    }
+                });
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
new file mode 100644
index 0000000..ebf49fd
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Optimized serialization for Hadoop {@link Writable} types.
+ */
+public class HadoopWritableSerialization implements HadoopSerialization {
+    /** */
+    private final Class<? extends Writable> cls;
+
+    /**
+     * @param cls Class.
+     */
+    public HadoopWritableSerialization(Class<? extends Writable> cls) {
+        assert cls != null;
+
+        this.cls = cls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
+        assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass();
+
+        try {
+            ((Writable)obj).write(out);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
+        Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
+
+        try {
+            w.readFields(in);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        return w;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
deleted file mode 100644
index 090b336..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.jobtracker;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP;
-
-/**
- * Hadoop job metadata. Internal object used for distributed job state tracking.
- */
-public class HadoopJobMetadata implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Job ID. */
-    private HadoopJobId jobId;
-
-    /** Job info. */
-    private HadoopJobInfo jobInfo;
-
-    /** Node submitted job. */
-    private UUID submitNodeId;
-
-    /** Map-reduce plan. */
-    private HadoopMapReducePlan mrPlan;
-
-    /** Pending splits for which mapper should be executed. */
-    private Map<HadoopInputSplit, Integer> pendingSplits;
-
-    /** Pending reducers. */
-    private Collection<Integer> pendingReducers;
-
-    /** Reducers addresses. */
-    @GridToStringInclude
-    private Map<Integer, HadoopProcessDescriptor> reducersAddrs;
-
-    /** Job phase. */
-    private HadoopJobPhase phase = PHASE_SETUP;
-
-    /** Fail cause. */
-    @GridToStringExclude
-    private Throwable failCause;
-
-    /** Version. */
-    private long ver;
-
-    /** Job counters */
-    private HadoopCounters counters = new HadoopCountersImpl();
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public HadoopJobMetadata() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param submitNodeId Submit node ID.
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     */
-    public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) {
-        this.jobId = jobId;
-        this.jobInfo = jobInfo;
-        this.submitNodeId = submitNodeId;
-    }
-
-    /**
-     * Copy constructor.
-     *
-     * @param src Metadata to copy.
-     */
-    public HadoopJobMetadata(HadoopJobMetadata src) {
-        // Make sure to preserve alphabetic order.
-        counters = src.counters;
-        failCause = src.failCause;
-        jobId = src.jobId;
-        jobInfo = src.jobInfo;
-        mrPlan = src.mrPlan;
-        pendingSplits = src.pendingSplits;
-        pendingReducers = src.pendingReducers;
-        phase = src.phase;
-        reducersAddrs = src.reducersAddrs;
-        submitNodeId = src.submitNodeId;
-        ver = src.ver + 1;
-    }
-
-    /**
-     * @return Submit node ID.
-     */
-    public UUID submitNodeId() {
-        return submitNodeId;
-    }
-
-    /**
-     * @param phase Job phase.
-     */
-    public void phase(HadoopJobPhase phase) {
-        this.phase = phase;
-    }
-
-    /**
-     * @return Job phase.
-     */
-    public HadoopJobPhase phase() {
-        return phase;
-    }
-
-    /**
-     * Gets reducers addresses for external execution.
-     *
-     * @return Reducers addresses.
-     */
-    public Map<Integer, HadoopProcessDescriptor> reducersAddresses() {
-        return reducersAddrs;
-    }
-
-    /**
-     * Sets reducers addresses for external execution.
-     *
-     * @param reducersAddrs Map of addresses.
-     */
-    public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) {
-        this.reducersAddrs = reducersAddrs;
-    }
-
-    /**
-     * Sets collection of pending splits.
-     *
-     * @param pendingSplits Collection of pending splits.
-     */
-    public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) {
-        this.pendingSplits = pendingSplits;
-    }
-
-    /**
-     * Gets collection of pending splits.
-     *
-     * @return Collection of pending splits.
-     */
-    public Map<HadoopInputSplit, Integer> pendingSplits() {
-        return pendingSplits;
-    }
-
-    /**
-     * Sets collection of pending reducers.
-     *
-     * @param pendingReducers Collection of pending reducers.
-     */
-    public void pendingReducers(Collection<Integer> pendingReducers) {
-        this.pendingReducers = pendingReducers;
-    }
-
-    /**
-     * Gets collection of pending reducers.
-     *
-     * @return Collection of pending reducers.
-     */
-    public Collection<Integer> pendingReducers() {
-        return pendingReducers;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @param mrPlan Map-reduce plan.
-     */
-    public void mapReducePlan(HadoopMapReducePlan mrPlan) {
-        assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
-
-        this.mrPlan = mrPlan;
-    }
-
-    /**
-     * @return Map-reduce plan.
-     */
-    public HadoopMapReducePlan mapReducePlan() {
-        return mrPlan;
-    }
-
-    /**
-     * @return Job info.
-     */
-    public HadoopJobInfo jobInfo() {
-        return jobInfo;
-    }
-
-    /**
-     * Returns job counters.
-     *
-     * @return Collection of counters.
-     */
-    public HadoopCounters counters() {
-        return counters;
-    }
-
-    /**
-     * Sets counters.
-     *
-     * @param counters Collection of counters.
-     */
-    public void counters(HadoopCounters counters) {
-        this.counters = counters;
-    }
-
-    /**
-     * @param failCause Fail cause.
-     */
-    public void failCause(Throwable failCause) {
-        assert failCause != null;
-
-        if (this.failCause == null) // Keep the first error.
-            this.failCause = failCause;
-    }
-
-    /**
-     * @return Fail cause.
-     */
-    public Throwable failCause() {
-        return failCause;
-    }
-
-    /**
-     * @return Version.
-     */
-    public long version() {
-        return ver;
-    }
-
-    /**
-     * @param split Split.
-     * @return Task number.
-     */
-    public int taskNumber(HadoopInputSplit split) {
-        return pendingSplits.get(split);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeUuid(out, submitNodeId);
-        out.writeObject(jobId);
-        out.writeObject(jobInfo);
-        out.writeObject(mrPlan);
-        out.writeObject(pendingSplits);
-        out.writeObject(pendingReducers);
-        out.writeObject(phase);
-        out.writeObject(failCause);
-        out.writeLong(ver);
-        out.writeObject(reducersAddrs);
-        out.writeObject(counters);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        submitNodeId = U.readUuid(in);
-        jobId = (HadoopJobId)in.readObject();
-        jobInfo = (HadoopJobInfo)in.readObject();
-        mrPlan = (HadoopMapReducePlan)in.readObject();
-        pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject();
-        pendingReducers = (Collection<Integer>)in.readObject();
-        phase = (HadoopJobPhase)in.readObject();
-        failCause = (Throwable)in.readObject();
-        ver = in.readLong();
-        reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject();
-        counters = (HadoopCounters)in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
-            "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
-                failCause.getClass().getName());
-    }
-}
\ No newline at end of file