You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/22 07:01:21 UTC
[41/51] [partial] ignite git commit: IGNITE-3949: Applied new
HadoopClassLoader architecture.
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
new file mode 100644
index 0000000..4c030b4
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTask;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopLazyConcurrentMap;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1CleanupTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Partitioner;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1ReduceTask;
+import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1SetupTask;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.unwrapSplit;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.FsCacheKey;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
+import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
+
+/**
+ * Context for task execution.
+ */
+public class HadoopV2TaskContext extends HadoopTaskContext {
+ /** */
+ private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
+
+ /** Lazy per-user file system cache used by the Hadoop task. */
+ private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
+ = createHadoopLazyConcurrentMap();
+
+ /**
+ * This method is called with reflection upon Job finish with class loader of each task.
+ * This will clean up all the Fs created for specific task.
+ * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders
+ * are different.
+ *
+ * @throws IgniteCheckedException On error.
+ */
+ public static void close() throws IgniteCheckedException {
+ fsMap.close();
+ }
+
+ /**
+ * Check for combiner grouping support (available since Hadoop 2.3).
+ */
+ static {
+ boolean ok;
+
+ try {
+ JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
+
+ ok = true;
+ }
+ catch (NoSuchMethodException ignore) {
+ ok = false;
+ }
+
+ COMBINE_KEY_GROUPING_SUPPORTED = ok;
+ }
+
+ /** Flag is set if new context-object code is used for running the mapper. */
+ private final boolean useNewMapper;
+
+ /** Flag is set if new context-object code is used for running the reducer. */
+ private final boolean useNewReducer;
+
+ /** Flag is set if new context-object code is used for running the combiner. */
+ private final boolean useNewCombiner;
+
+ /** */
+ private final JobContextImpl jobCtx;
+
+ /** Set if task is to cancelling. */
+ private volatile boolean cancelled;
+
+ /** Current task. */
+ private volatile HadoopTask task;
+
+ /** Local node ID */
+ private final UUID locNodeId;
+
+ /** Counters for task. */
+ private final HadoopCounters cntrs = new HadoopCountersImpl();
+
+ /**
+ * @param taskInfo Task info.
+ * @param job Job.
+ * @param jobId Job ID.
+ * @param locNodeId Local node ID.
+ * @param jobConfDataInput DataInput for read JobConf.
+ */
+ public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId,
+ @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
+ super(taskInfo, job);
+ this.locNodeId = locNodeId;
+
+ // Before create JobConf instance we should set new context class loader.
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(getClass().getClassLoader());
+
+ try {
+ JobConf jobConf = new JobConf();
+
+ try {
+ jobConf.readFields(jobConfDataInput);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ // For map-reduce jobs prefer local writes.
+ jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
+
+ jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
+
+ useNewMapper = jobConf.getUseNewMapper();
+ useNewReducer = jobConf.getUseNewReducer();
+ useNewCombiner = jobConf.getCombinerClass() == null;
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+ return cntrs.counter(grp, name, cls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopCounters counters() {
+ return cntrs;
+ }
+
+ /**
+ * Creates appropriate task from current task info.
+ *
+ * @return Task.
+ */
+ private HadoopTask createTask() {
+ boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
+
+ switch (taskInfo().type()) {
+ case SETUP:
+ return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo());
+
+ case MAP:
+ return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo());
+
+ case REDUCE:
+ return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) :
+ new HadoopV1ReduceTask(taskInfo(), true);
+
+ case COMBINE:
+ return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) :
+ new HadoopV1ReduceTask(taskInfo(), false);
+
+ case COMMIT:
+ case ABORT:
+ return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) :
+ new HadoopV1CleanupTask(taskInfo(), isAbort);
+
+ default:
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() throws IgniteCheckedException {
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
+
+ try {
+ try {
+ task = createTask();
+ }
+ catch (Throwable e) {
+ if (e instanceof Error)
+ throw e;
+
+ throw transformException(e);
+ }
+
+ if (cancelled)
+ throw new HadoopTaskCancelledException("Task cancelled.");
+
+ try {
+ task.run(this);
+ }
+ catch (Throwable e) {
+ if (e instanceof Error)
+ throw e;
+
+ throw transformException(e);
+ }
+ }
+ finally {
+ task = null;
+
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ cancelled = true;
+
+ HadoopTask t = task;
+
+ if (t != null)
+ t.cancel();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+ File locDir;
+
+ switch(taskInfo().type()) {
+ case MAP:
+ case REDUCE:
+ job().prepareTaskEnvironment(taskInfo());
+
+ locDir = taskLocalDir(locNodeId, taskInfo());
+
+ break;
+
+ default:
+ locDir = jobLocalDir(locNodeId, taskInfo().jobId());
+ }
+
+ ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf().getClassLoader());
+
+ try {
+ FileSystem.get(jobConf());
+
+ LocalFileSystem locFs = FileSystem.getLocal(jobConf());
+
+ locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
+ }
+ catch (Throwable e) {
+ if (e instanceof Error)
+ throw (Error)e;
+
+ throw transformException(e);
+ }
+ finally {
+ HadoopCommonUtils.restoreContextClassLoader(oldLdr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+ job().cleanupTaskEnvironment(taskInfo());
+ }
+
+ /**
+ * Creates Hadoop attempt ID.
+ *
+ * @return Attempt ID.
+ */
+ public TaskAttemptID attemptId() {
+ TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
+
+ return new TaskAttemptID(tid, taskInfo().attempt());
+ }
+
+ /**
+ * @param type Task type.
+ * @return Hadoop task type.
+ */
+ private TaskType taskType(HadoopTaskType type) {
+ switch (type) {
+ case SETUP:
+ return TaskType.JOB_SETUP;
+ case MAP:
+ case COMBINE:
+ return TaskType.MAP;
+
+ case REDUCE:
+ return TaskType.REDUCE;
+
+ case COMMIT:
+ case ABORT:
+ return TaskType.JOB_CLEANUP;
+
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Gets job configuration of the task.
+ *
+ * @return Job configuration.
+ */
+ public JobConf jobConf() {
+ return jobCtx.getJobConf();
+ }
+
+ /**
+ * Gets job context of the task.
+ *
+ * @return Job context.
+ */
+ public JobContextImpl jobContext() {
+ return jobCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
+ Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
+
+ if (partClsOld != null)
+ return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
+
+ try {
+ return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
+ }
+ catch (ClassNotFoundException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Gets serializer for specified class.
+ *
+ * @param cls Class.
+ * @param jobConf Job configuration.
+ * @return Appropriate serializer.
+ */
+ @SuppressWarnings("unchecked")
+ private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
+ A.notNull(cls, "cls");
+
+ SerializationFactory factory = new SerializationFactory(jobConf);
+
+ Serialization<?> serialization = factory.getSerialization(cls);
+
+ if (serialization == null)
+ throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
+
+ if (serialization.getClass() == WritableSerialization.class)
+ return new HadoopWritableSerialization((Class<? extends Writable>)cls);
+
+ return new HadoopSerializationWrapper(serialization, cls);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
+ return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
+ return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Comparator<Object> sortComparator() {
+ return (Comparator<Object>)jobCtx.getSortComparator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Comparator<Object> groupComparator() {
+ Comparator<?> res;
+
+ switch (taskInfo().type()) {
+ case COMBINE:
+ res = COMBINE_KEY_GROUPING_SUPPORTED ?
+ jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
+
+ break;
+
+ case REDUCE:
+ res = jobContext().getGroupingComparator();
+
+ break;
+
+ default:
+ return null;
+ }
+
+ if (res != null && res.getClass() != sortComparator().getClass())
+ return (Comparator<Object>)res;
+
+ return null;
+ }
+
+ /**
+ * @param split Split.
+ * @return Native Hadoop split.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException {
+ if (split instanceof HadoopExternalSplit)
+ return readExternalSplit((HadoopExternalSplit)split);
+
+ if (split instanceof HadoopSplitWrapper)
+ return unwrapSplit((HadoopSplitWrapper)split);
+
+ throw new IllegalStateException("Unknown split: " + split);
+ }
+
+ /**
+ * @param split External split.
+ * @return Native input split.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
+ Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
+
+ FileSystem fs;
+
+ try {
+ // This assertion uses .startsWith() instead of .equals() because task class loaders may
+ // be reused between tasks of the same job.
+ assert ((HadoopClassLoader)getClass().getClassLoader()).name()
+ .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
+
+ // We also cache Fs there, all them will be cleared explicitly upon the Job end.
+ fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ try (
+ FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
+
+ in.seek(split.offset());
+
+ String clsName = Text.readString(in);
+
+ Class<?> cls = jobConf().getClassByName(clsName);
+
+ assert cls != null;
+
+ Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
+
+ Deserializer deserializer = serialization.getDeserializer(cls);
+
+ deserializer.open(in);
+
+ Object res = deserializer.deserialize(null);
+
+ deserializer.close();
+
+ assert res != null;
+
+ return res;
+ }
+ catch (IOException | ClassNotFoundException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+ String user = job.info().user();
+
+ user = IgfsUtils.fixUserName(user);
+
+ assert user != null;
+
+ String ugiUser;
+
+ try {
+ UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+ assert currUser != null;
+
+ ugiUser = currUser.getShortUserName();
+ }
+ catch (IOException ioe) {
+ throw new IgniteCheckedException(ioe);
+ }
+
+ try {
+ if (F.eq(user, ugiUser))
+ // if current UGI context user is the same, do direct call:
+ return c.call();
+ else {
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ @Override public T run() throws Exception {
+ return c.call();
+ }
+ });
+ }
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
new file mode 100644
index 0000000..ebf49fd
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Optimized serialization for Hadoop {@link Writable} types.
+ */
+public class HadoopWritableSerialization implements HadoopSerialization {
+ /** */
+ private final Class<? extends Writable> cls;
+
+ /**
+ * @param cls Class.
+ */
+ public HadoopWritableSerialization(Class<? extends Writable> cls) {
+ assert cls != null;
+
+ this.cls = cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
+ assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass();
+
+ try {
+ ((Writable)obj).write(out);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
+ Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
+
+ try {
+ w.readFields(in);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ return w;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
deleted file mode 100644
index 090b336..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.jobtracker;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP;
-
-/**
- * Hadoop job metadata. Internal object used for distributed job state tracking.
- */
-public class HadoopJobMetadata implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- private HadoopJobId jobId;
-
- /** Job info. */
- private HadoopJobInfo jobInfo;
-
- /** Node submitted job. */
- private UUID submitNodeId;
-
- /** Map-reduce plan. */
- private HadoopMapReducePlan mrPlan;
-
- /** Pending splits for which mapper should be executed. */
- private Map<HadoopInputSplit, Integer> pendingSplits;
-
- /** Pending reducers. */
- private Collection<Integer> pendingReducers;
-
- /** Reducers addresses. */
- @GridToStringInclude
- private Map<Integer, HadoopProcessDescriptor> reducersAddrs;
-
- /** Job phase. */
- private HadoopJobPhase phase = PHASE_SETUP;
-
- /** Fail cause. */
- @GridToStringExclude
- private Throwable failCause;
-
- /** Version. */
- private long ver;
-
- /** Job counters */
- private HadoopCounters counters = new HadoopCountersImpl();
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public HadoopJobMetadata() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param submitNodeId Submit node ID.
- * @param jobId Job ID.
- * @param jobInfo Job info.
- */
- public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) {
- this.jobId = jobId;
- this.jobInfo = jobInfo;
- this.submitNodeId = submitNodeId;
- }
-
- /**
- * Copy constructor.
- *
- * @param src Metadata to copy.
- */
- public HadoopJobMetadata(HadoopJobMetadata src) {
- // Make sure to preserve alphabetic order.
- counters = src.counters;
- failCause = src.failCause;
- jobId = src.jobId;
- jobInfo = src.jobInfo;
- mrPlan = src.mrPlan;
- pendingSplits = src.pendingSplits;
- pendingReducers = src.pendingReducers;
- phase = src.phase;
- reducersAddrs = src.reducersAddrs;
- submitNodeId = src.submitNodeId;
- ver = src.ver + 1;
- }
-
- /**
- * @return Submit node ID.
- */
- public UUID submitNodeId() {
- return submitNodeId;
- }
-
- /**
- * @param phase Job phase.
- */
- public void phase(HadoopJobPhase phase) {
- this.phase = phase;
- }
-
- /**
- * @return Job phase.
- */
- public HadoopJobPhase phase() {
- return phase;
- }
-
- /**
- * Gets reducers addresses for external execution.
- *
- * @return Reducers addresses.
- */
- public Map<Integer, HadoopProcessDescriptor> reducersAddresses() {
- return reducersAddrs;
- }
-
- /**
- * Sets reducers addresses for external execution.
- *
- * @param reducersAddrs Map of addresses.
- */
- public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) {
- this.reducersAddrs = reducersAddrs;
- }
-
- /**
- * Sets collection of pending splits.
- *
- * @param pendingSplits Collection of pending splits.
- */
- public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) {
- this.pendingSplits = pendingSplits;
- }
-
- /**
- * Gets collection of pending splits.
- *
- * @return Collection of pending splits.
- */
- public Map<HadoopInputSplit, Integer> pendingSplits() {
- return pendingSplits;
- }
-
- /**
- * Sets collection of pending reducers.
- *
- * @param pendingReducers Collection of pending reducers.
- */
- public void pendingReducers(Collection<Integer> pendingReducers) {
- this.pendingReducers = pendingReducers;
- }
-
- /**
- * Gets collection of pending reducers.
- *
- * @return Collection of pending reducers.
- */
- public Collection<Integer> pendingReducers() {
- return pendingReducers;
- }
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @param mrPlan Map-reduce plan.
- */
- public void mapReducePlan(HadoopMapReducePlan mrPlan) {
- assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
-
- this.mrPlan = mrPlan;
- }
-
- /**
- * @return Map-reduce plan.
- */
- public HadoopMapReducePlan mapReducePlan() {
- return mrPlan;
- }
-
- /**
- * @return Job info.
- */
- public HadoopJobInfo jobInfo() {
- return jobInfo;
- }
-
- /**
- * Returns job counters.
- *
- * @return Collection of counters.
- */
- public HadoopCounters counters() {
- return counters;
- }
-
- /**
- * Sets counters.
- *
- * @param counters Collection of counters.
- */
- public void counters(HadoopCounters counters) {
- this.counters = counters;
- }
-
- /**
- * @param failCause Fail cause.
- */
- public void failCause(Throwable failCause) {
- assert failCause != null;
-
- if (this.failCause == null) // Keep the first error.
- this.failCause = failCause;
- }
-
- /**
- * @return Fail cause.
- */
- public Throwable failCause() {
- return failCause;
- }
-
- /**
- * @return Version.
- */
- public long version() {
- return ver;
- }
-
- /**
- * @param split Split.
- * @return Task number.
- */
- public int taskNumber(HadoopInputSplit split) {
- return pendingSplits.get(split);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeUuid(out, submitNodeId);
- out.writeObject(jobId);
- out.writeObject(jobInfo);
- out.writeObject(mrPlan);
- out.writeObject(pendingSplits);
- out.writeObject(pendingReducers);
- out.writeObject(phase);
- out.writeObject(failCause);
- out.writeLong(ver);
- out.writeObject(reducersAddrs);
- out.writeObject(counters);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- submitNodeId = U.readUuid(in);
- jobId = (HadoopJobId)in.readObject();
- jobInfo = (HadoopJobInfo)in.readObject();
- mrPlan = (HadoopMapReducePlan)in.readObject();
- pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject();
- pendingReducers = (Collection<Integer>)in.readObject();
- phase = (HadoopJobPhase)in.readObject();
- failCause = (Throwable)in.readObject();
- ver = in.readLong();
- reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject();
- counters = (HadoopCounters)in.readObject();
- }
-
- /** {@inheritDoc} */
- public String toString() {
- return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
- "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
- failCause.getClass().getName());
- }
-}
\ No newline at end of file