You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/05 10:06:28 UTC
[17/51] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
deleted file mode 100644
index 48558fc..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Fake manager for shutdown hooks.
- */
-public class GridHadoopShutdownHookManager {
- /** */
- private static final GridHadoopShutdownHookManager MGR = new GridHadoopShutdownHookManager();
-
- /**
- * Return <code>ShutdownHookManager</code> singleton.
- *
- * @return <code>ShutdownHookManager</code> singleton.
- */
- public static GridHadoopShutdownHookManager get() {
- return MGR;
- }
-
- /** */
- private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>());
-
- /** */
- private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
-
- /**
- * Singleton.
- */
- private GridHadoopShutdownHookManager() {
- // No-op.
- }
-
- /**
- * Adds a shutdownHook with a priority, the higher the priority
- * the earlier will run. ShutdownHooks with same priority run
- * in a non-deterministic order.
- *
- * @param shutdownHook shutdownHook <code>Runnable</code>
- * @param priority priority of the shutdownHook.
- */
- public void addShutdownHook(Runnable shutdownHook, int priority) {
- if (shutdownHook == null)
- throw new IllegalArgumentException("shutdownHook cannot be NULL");
-
- hooks.add(shutdownHook);
- }
-
- /**
- * Removes a shutdownHook.
- *
- * @param shutdownHook shutdownHook to remove.
- * @return TRUE if the shutdownHook was registered and removed,
- * FALSE otherwise.
- */
- public boolean removeShutdownHook(Runnable shutdownHook) {
- return hooks.remove(shutdownHook);
- }
-
- /**
- * Indicates if a shutdownHook is registered or not.
- *
- * @param shutdownHook shutdownHook to check if registered.
- * @return TRUE/FALSE depending if the shutdownHook is is registered.
- */
- public boolean hasShutdownHook(Runnable shutdownHook) {
- return hooks.contains(shutdownHook);
- }
-
- /**
- * Indicates if shutdown is in progress or not.
- *
- * @return TRUE if the shutdown is in progress, otherwise FALSE.
- */
- public boolean isShutdownInProgress() {
- return shutdownInProgress.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
deleted file mode 100644
index 57edfa9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * The wrapper for native hadoop input splits.
- *
- * Warning!! This class must not depend on any Hadoop classes directly or indirectly.
- */
-public class GridHadoopSplitWrapper extends GridHadoopInputSplit {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Native hadoop input split. */
- private byte[] bytes;
-
- /** */
- private String clsName;
-
- /** Internal ID */
- private int id;
-
- /**
- * Creates new split wrapper.
- */
- public GridHadoopSplitWrapper() {
- // No-op.
- }
-
- /**
- * Creates new split wrapper.
- *
- * @param id Split ID.
- * @param clsName Class name.
- * @param bytes Serialized class.
- * @param hosts Hosts where split is located.
- */
- public GridHadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) {
- assert hosts != null;
- assert clsName != null;
- assert bytes != null;
-
- this.hosts = hosts;
- this.id = id;
-
- this.clsName = clsName;
- this.bytes = bytes;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(id);
-
- out.writeUTF(clsName);
- U.writeByteArray(out, bytes);
- }
-
- /**
- * @return Class name.
- */
- public String className() {
- return clsName;
- }
-
- /**
- * @return Class bytes.
- */
- public byte[] bytes() {
- return bytes;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- id = in.readInt();
-
- clsName = in.readUTF();
- bytes = U.readByteArray(in);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridHadoopSplitWrapper that = (GridHadoopSplitWrapper)o;
-
- return id == that.id;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return id;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
deleted file mode 100644
index 38be3da..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.*;
-
-import java.io.*;
-
-/**
- * Hadoop cleanup task (commits or aborts job).
- */
-public class GridHadoopV2CleanupTask extends GridHadoopV2Task {
- /** Abort flag. */
- private final boolean abort;
-
- /**
- * @param taskInfo Task info.
- * @param abort Abort flag.
- */
- public GridHadoopV2CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) {
- super(taskInfo);
-
- this.abort = abort;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException {
- JobContextImpl jobCtx = taskCtx.jobContext();
-
- try {
- OutputFormat outputFormat = getOutputFormat(jobCtx);
-
- OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext());
-
- if (committer != null) {
- if (abort)
- committer.abortJob(jobCtx, JobStatus.State.FAILED);
- else
- committer.commitJob(jobCtx);
- }
- }
- catch (ClassNotFoundException | IOException e) {
- throw new IgniteCheckedException(e);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
deleted file mode 100644
index 287b10f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.mapreduce.task.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks.
- */
-public class GridHadoopV2Context extends JobContextImpl implements MapContext, ReduceContext {
- /** Input reader to overriding of GridHadoopTaskContext input. */
- private RecordReader reader;
-
- /** Output writer to overriding of GridHadoopTaskContext output. */
- private RecordWriter writer;
-
- /** Output is provided by executor environment. */
- private final GridHadoopTaskOutput output;
-
- /** Input is provided by executor environment. */
- private final GridHadoopTaskInput input;
-
- /** Unique identifier for a task attempt. */
- private final TaskAttemptID taskAttemptID;
-
- /** Indicates that this task is to be cancelled. */
- private volatile boolean cancelled;
-
- /** Input split. */
- private InputSplit inputSplit;
-
- /** */
- private final GridHadoopTaskContext ctx;
-
- /** */
- private String status;
-
- /**
- * @param ctx Context for IO operations.
- */
- public GridHadoopV2Context(GridHadoopV2TaskContext ctx) {
- super(ctx.jobConf(), ctx.jobContext().getJobID());
-
- taskAttemptID = ctx.attemptId();
-
- conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString());
- conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString());
-
- output = ctx.output();
- input = ctx.input();
-
- this.ctx = ctx;
- }
-
- /** {@inheritDoc} */
- @Override public InputSplit getInputSplit() {
- if (inputSplit == null) {
- GridHadoopInputSplit split = ctx.taskInfo().inputSplit();
-
- if (split == null)
- return null;
-
- if (split instanceof GridHadoopFileBlock) {
- GridHadoopFileBlock fileBlock = (GridHadoopFileBlock)split;
-
- inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null);
- }
- else if (split instanceof GridHadoopExternalSplit)
- throw new UnsupportedOperationException(); // TODO
- else if (split instanceof GridHadoopSplitWrapper)
- inputSplit = (InputSplit)GridHadoopUtils.unwrapSplit((GridHadoopSplitWrapper)split);
- else
- throw new IllegalStateException();
- }
-
- return inputSplit;
- }
-
- /** {@inheritDoc} */
- @Override public boolean nextKeyValue() throws IOException, InterruptedException {
- if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
-
- return reader.nextKeyValue();
- }
-
- /** {@inheritDoc} */
- @Override public Object getCurrentKey() throws IOException, InterruptedException {
- if (reader != null)
- return reader.getCurrentKey();
-
- return input.key();
- }
-
- /** {@inheritDoc} */
- @Override public Object getCurrentValue() throws IOException, InterruptedException {
- return reader.getCurrentValue();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void write(Object key, Object val) throws IOException, InterruptedException {
- if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
-
- if (writer != null)
- writer.write(key, val);
- else {
- try {
- output.write(key, val);
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public OutputCommitter getOutputCommitter() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public TaskAttemptID getTaskAttemptID() {
- return taskAttemptID;
- }
-
- /** {@inheritDoc} */
- @Override public void setStatus(String msg) {
- status = msg;
- }
-
- /** {@inheritDoc} */
- @Override public String getStatus() {
- return status;
- }
-
- /** {@inheritDoc} */
- @Override public float getProgress() {
- return 0.5f; // TODO
- }
-
- /** {@inheritDoc} */
- @Override public Counter getCounter(Enum<?> cntrName) {
- return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name());
- }
-
- /** {@inheritDoc} */
- @Override public Counter getCounter(String grpName, String cntrName) {
- return new GridHadoopV2Counter(ctx.counter(grpName, cntrName, GridHadoopLongCounter.class));
- }
-
- /** {@inheritDoc} */
- @Override public void progress() {
- // No-op.
- }
-
- /**
- * Overrides default input data reader.
- *
- * @param reader New reader.
- */
- public void reader(RecordReader reader) {
- this.reader = reader;
- }
-
- /** {@inheritDoc} */
- @Override public boolean nextKey() throws IOException, InterruptedException {
- if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
-
- return input.next();
- }
-
- /** {@inheritDoc} */
- @Override public Iterable getValues() throws IOException, InterruptedException {
- return new Iterable() {
- @Override public Iterator iterator() {
- return input.values();
- }
- };
- }
-
- /**
- * @return Overridden output data writer.
- */
- public RecordWriter writer() {
- return writer;
- }
-
- /**
- * Overrides default output data writer.
- *
- * @param writer New writer.
- */
- public void writer(RecordWriter writer) {
- this.writer = writer;
- }
-
- /**
- * Cancels the task by stop the IO.
- */
- public void cancel() {
- cancelled = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Counter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Counter.java
deleted file mode 100644
index 6bf8a44..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Counter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-
-import java.io.*;
-
-/**
- * Adapter from own counter implementation into Hadoop API Counter od version 2.0.
- */
-public class GridHadoopV2Counter implements Counter {
- /** Delegate. */
- private final GridHadoopLongCounter cntr;
-
- /**
- * Creates new instance with given delegate.
- *
- * @param cntr Internal counter.
- */
- public GridHadoopV2Counter(GridHadoopLongCounter cntr) {
- assert cntr != null : "counter must be non-null";
-
- this.cntr = cntr;
- }
-
- /** {@inheritDoc} */
- @Override public void setDisplayName(String displayName) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String getName() {
- return cntr.name();
- }
-
- /** {@inheritDoc} */
- @Override public String getDisplayName() {
- return getName();
- }
-
- /** {@inheritDoc} */
- @Override public long getValue() {
- return cntr.value();
- }
-
- /** {@inheritDoc} */
- @Override public void setValue(long val) {
- cntr.value(val);
- }
-
- /** {@inheritDoc} */
- @Override public void increment(long incr) {
- cntr.increment(incr);
- }
-
- /** {@inheritDoc} */
- @Override public Counter getUnderlyingCounter() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
deleted file mode 100644
index 7c36948..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.split.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.internal.processors.hadoop.v1.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-
-import java.io.*;
-import java.lang.reflect.*;
-import java.util.*;
-import java.util.Queue;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
-
-/**
- * Hadoop job implementation for v2 API.
- */
-public class GridHadoopV2Job implements GridHadoopJob {
- /** */
- private final JobConf jobConf;
-
- /** */
- private final JobContextImpl jobCtx;
-
- /** Hadoop job ID. */
- private final GridHadoopJobId jobId;
-
- /** Job info. */
- protected GridHadoopJobInfo jobInfo;
-
- /** */
- private final JobID hadoopJobID;
-
- /** */
- private final GridHadoopV2JobResourceManager rsrcMgr;
-
- /** */
- private final ConcurrentMap<T2<GridHadoopTaskType, Integer>, GridFutureAdapter<GridHadoopTaskContext>> ctxs =
- new ConcurrentHashMap8<>();
-
- /** Pooling task context class and thus class loading environment. */
- private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
-
- /** Local node ID */
- private UUID locNodeId;
-
- /** Serialized JobConf. */
- private volatile byte[] jobConfData;
-
- /**
- * @param jobId Job ID.
- * @param jobInfo Job info.
- * @param log Logger.
- */
- public GridHadoopV2Job(GridHadoopJobId jobId, final GridHadoopDefaultJobInfo jobInfo, IgniteLogger log) {
- assert jobId != null;
- assert jobInfo != null;
-
- this.jobId = jobId;
- this.jobInfo = jobInfo;
-
- hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
-
- GridHadoopClassLoader clsLdr = (GridHadoopClassLoader)getClass().getClassLoader();
-
- // Before create JobConf instance we should set new context class loader.
- Thread.currentThread().setContextClassLoader(clsLdr);
-
- jobConf = new JobConf();
-
- GridHadoopFileSystemsUtils.setupFileSystems(jobConf);
-
- Thread.currentThread().setContextClassLoader(null);
-
- for (Map.Entry<String,String> e : jobInfo.properties().entrySet())
- jobConf.set(e.getKey(), e.getValue());
-
- jobCtx = new JobContextImpl(jobConf, hadoopJobID);
-
- rsrcMgr = new GridHadoopV2JobResourceManager(jobId, jobCtx, log);
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobId id() {
- return jobId;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobInfo info() {
- return jobInfo;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException {
- Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
-
- try {
- String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
-
- if (jobDirPath == null) { // Probably job was submitted not by hadoop client.
- // Assume that we have needed classes and try to generate input splits ourself.
- if (jobConf.getUseNewMapper())
- return GridHadoopV2Splitter.splitJob(jobCtx);
- else
- return GridHadoopV1Splitter.splitJob(jobConf);
- }
-
- Path jobDir = new Path(jobDirPath);
-
- try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
- JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
- jobDir);
-
- if (F.isEmpty(metaInfos))
- throw new IgniteCheckedException("No input splits found.");
-
- Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir);
-
- try (FSDataInputStream in = fs.open(splitsFile)) {
- Collection<GridHadoopInputSplit> res = new ArrayList<>(metaInfos.length);
-
- for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) {
- long off = metaInfo.getStartOffset();
-
- String[] hosts = metaInfo.getLocations();
-
- in.seek(off);
-
- String clsName = Text.readString(in);
-
- GridHadoopFileBlock block = GridHadoopV1Splitter.readFileBlock(clsName, in, hosts);
-
- if (block == null)
- block = GridHadoopV2Splitter.readFileBlock(clsName, in, hosts);
-
- res.add(block != null ? block : new GridHadoopExternalSplit(hosts, off));
- }
-
- return res;
- }
- }
- catch (Throwable e) {
- throw transformException(e);
- }
- }
- finally {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException {
- T2<GridHadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber());
-
- GridFutureAdapter<GridHadoopTaskContext> fut = ctxs.get(locTaskId);
-
- if (fut != null)
- return fut.get();
-
- GridFutureAdapter<GridHadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>());
-
- if (old != null)
- return old.get();
-
- Class<?> cls = taskCtxClsPool.poll();
-
- try {
- if (cls == null) {
- // If there is no pooled class, then load new one.
- GridHadoopClassLoader ldr = new GridHadoopClassLoader(rsrcMgr.classPath());
-
- cls = ldr.loadClass(GridHadoopV2TaskContext.class.getName());
- }
-
- Constructor<?> ctr = cls.getConstructor(GridHadoopTaskInfo.class, GridHadoopJob.class,
- GridHadoopJobId.class, UUID.class, DataInput.class);
-
- if (jobConfData == null)
- synchronized(jobConf) {
- if (jobConfData == null) {
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
- jobConf.write(new DataOutputStream(buf));
-
- jobConfData = buf.toByteArray();
- }
- }
-
- GridHadoopTaskContext res = (GridHadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId,
- new DataInputStream(new ByteArrayInputStream(jobConfData)));
-
- fut.onDone(res);
-
- return res;
- }
- catch (Throwable e) {
- IgniteCheckedException te = transformException(e);
-
- fut.onDone(te);
-
- throw te;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException {
- this.locNodeId = locNodeId;
-
- Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
-
- try {
- rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId));
- }
- finally {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void dispose(boolean external) throws IgniteCheckedException {
- if (rsrcMgr != null && !external) {
- File jobLocDir = jobLocalDir(locNodeId, jobId);
-
- if (jobLocDir.exists())
- U.delete(jobLocDir);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException {
- rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info));
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException {
- GridHadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get();
-
- taskCtxClsPool.offer(ctx.getClass());
-
- File locDir = taskLocalDir(locNodeId, info);
-
- if (locDir.exists())
- U.delete(locDir);
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupStagingDirectory() {
- if (rsrcMgr != null)
- rsrcMgr.cleanupStagingDirectory();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2JobResourceManager.java
deleted file mode 100644
index be619c7..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2JobResourceManager.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.file.*;
-import java.util.*;
-
-/**
- * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional
- * files are needed to be placed on local files system.
- */
-public class GridHadoopV2JobResourceManager {
- /** Hadoop job context. */
- private final JobContextImpl ctx;
-
- /** Logger. */
- private final IgniteLogger log;
-
- /** Job ID. */
- private final GridHadoopJobId jobId;
-
- /** Class path list. */
- private URL[] clsPath;
-
- /** Set of local resources. */
- private final Collection<File> rsrcSet = new HashSet<>();
-
- /** Staging directory to delivery job jar and config to the work nodes. */
- private Path stagingDir;
-
- /**
- * Creates new instance.
- * @param jobId Job ID.
- * @param ctx Hadoop job context.
- * @param log Logger.
- */
- public GridHadoopV2JobResourceManager(GridHadoopJobId jobId, JobContextImpl ctx, IgniteLogger log) {
- this.jobId = jobId;
- this.ctx = ctx;
- this.log = log.getLogger(GridHadoopV2JobResourceManager.class);
- }
-
- /**
- * Set working directory in local file system.
- *
- * @param dir Working directory.
- * @throws IOException If fails.
- */
- private void setLocalFSWorkingDirectory(File dir) throws IOException {
- JobConf cfg = ctx.getJobConf();
-
- Thread.currentThread().setContextClassLoader(cfg.getClassLoader());
-
- try {
- cfg.set(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
-
- if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
- FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
- }
- finally {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
-
- /**
- * Prepare job resources. Resolve the classpath list and download it if needed.
- *
- * @param download {@code true} If need to download resources.
- * @param jobLocDir Work directory for the job.
- * @throws IgniteCheckedException If failed.
- */
- public void prepareJobEnvironment(boolean download, File jobLocDir) throws IgniteCheckedException {
- try {
- if (jobLocDir.exists())
- throw new IgniteCheckedException("Local job directory already exists: " + jobLocDir.getAbsolutePath());
-
- JobConf cfg = ctx.getJobConf();
-
- String mrDir = cfg.get("mapreduce.job.dir");
-
- if (mrDir != null) {
- stagingDir = new Path(new URI(mrDir));
-
- if (download) {
- FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
-
- if (!fs.exists(stagingDir))
- throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
- stagingDir);
-
- if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
- throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
- "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
- }
-
- File jarJobFile = new File(jobLocDir, "job.jar");
-
- Collection<URL> clsPathUrls = new ArrayList<>();
-
- clsPathUrls.add(jarJobFile.toURI().toURL());
-
- rsrcSet.add(jarJobFile);
- rsrcSet.add(new File(jobLocDir, "job.xml"));
-
- processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES);
- processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES);
- processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null);
- processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null);
-
- if (!clsPathUrls.isEmpty()) {
- clsPath = new URL[clsPathUrls.size()];
-
- clsPathUrls.toArray(clsPath);
- }
- }
- else if (!jobLocDir.mkdirs())
- throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
-
- setLocalFSWorkingDirectory(jobLocDir);
- }
- catch (URISyntaxException | IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
- * Process list of resources.
- *
- * @param jobLocDir Job working directory.
- * @param files Array of {@link java.net.URI} or {@link org.apache.hadoop.fs.Path} to process resources.
- * @param download {@code true}, if need to download. Process class path only else.
- * @param extract {@code true}, if need to extract archive.
- * @param clsPathUrls Collection to add resource as classpath resource.
- * @param rsrcNameProp Property for resource name array setting.
- * @throws IOException If failed.
- */
- private void processFiles(File jobLocDir, @Nullable Object[] files, boolean download, boolean extract,
- @Nullable Collection<URL> clsPathUrls, @Nullable String rsrcNameProp) throws IOException {
- if (F.isEmptyOrNulls(files))
- return;
-
- Collection<String> res = new ArrayList<>();
-
- for (Object pathObj : files) {
- String locName = null;
- Path srcPath;
-
- if (pathObj instanceof URI) {
- URI uri = (URI)pathObj;
-
- locName = uri.getFragment();
-
- srcPath = new Path(uri);
- }
- else
- srcPath = (Path)pathObj;
-
- if (locName == null)
- locName = srcPath.getName();
-
- File dstPath = new File(jobLocDir.getAbsolutePath(), locName);
-
- res.add(locName);
-
- rsrcSet.add(dstPath);
-
- if (clsPathUrls != null)
- clsPathUrls.add(dstPath.toURI().toURL());
-
- if (!download)
- continue;
-
- JobConf cfg = ctx.getJobConf();
-
- FileSystem dstFs = FileSystem.getLocal(cfg);
-
- FileSystem srcFs = srcPath.getFileSystem(cfg);
-
- if (extract) {
- File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
-
- if (!archivesPath.exists() && !archivesPath.mkdir())
- throw new IOException("Failed to create directory " +
- "[path=" + archivesPath + ", jobId=" + jobId + ']');
-
- File archiveFile = new File(archivesPath, locName);
-
- FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg);
-
- String archiveNameLC = archiveFile.getName().toLowerCase();
-
- if (archiveNameLC.endsWith(".jar"))
- RunJar.unJar(archiveFile, dstPath);
- else if (archiveNameLC.endsWith(".zip"))
- FileUtil.unZip(archiveFile, dstPath);
- else if (archiveNameLC.endsWith(".tar.gz") ||
- archiveNameLC.endsWith(".tgz") ||
- archiveNameLC.endsWith(".tar"))
- FileUtil.unTar(archiveFile, dstPath);
- else
- throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']');
- }
- else
- FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg);
- }
-
- if (!res.isEmpty() && rsrcNameProp != null)
- ctx.getJobConf().setStrings(rsrcNameProp, res.toArray(new String[res.size()]));
- }
-
- /**
- * Prepares working directory for the task.
- *
- * <ul>
- * <li>Creates working directory.</li>
- * <li>Creates symbolic links to all job resources in working directory.</li>
- * </ul>
- *
- * @param path Path to working directory of the task.
- * @throws IgniteCheckedException If fails.
- */
- public void prepareTaskWorkDir(File path) throws IgniteCheckedException {
- try {
- if (path.exists())
- throw new IOException("Task local directory already exists: " + path);
-
- if (!path.mkdir())
- throw new IOException("Failed to create directory: " + path);
-
- for (File resource : rsrcSet) {
- File symLink = new File(path, resource.getName());
-
- try {
- Files.createSymbolicLink(symLink.toPath(), resource.toPath());
- }
- catch (IOException e) {
- String msg = "Unable to create symlink \"" + symLink + "\" to \"" + resource + "\".";
-
- if (U.isWindows() && e instanceof FileSystemException)
- msg += "\n\nAbility to create symbolic links is required!\n" +
- "On Windows platform you have to grant permission 'Create symbolic links'\n" +
- "to your user or run the Accelerator as Administrator.\n";
-
- throw new IOException(msg, e);
- }
- }
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Unable to prepare local working directory for the task " +
- "[jobId=" + jobId + ", path=" + path+ ']', e);
- }
- }
-
- /**
- * Cleans up job staging directory.
- */
- public void cleanupStagingDirectory() {
- try {
- if (stagingDir != null)
- stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true);
- }
- catch (Exception e) {
- log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);
- }
- }
-
- /**
- * Returns array of class path for current job.
- *
- * @return Class path collection.
- */
- @Nullable public URL[] classPath() {
- return clsPath;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2MapTask.java
deleted file mode 100644
index be0bea2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2MapTask.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.map.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.*;
-
-/**
- * Hadoop map task implementation for v2 API.
- */
-public class GridHadoopV2MapTask extends GridHadoopV2Task {
- /**
- * @param taskInfo Task info.
- */
- public GridHadoopV2MapTask(GridHadoopTaskInfo taskInfo) {
- super(taskInfo);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"ConstantConditions", "unchecked"})
- @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException {
- GridHadoopInputSplit split = info().inputSplit();
-
- InputSplit nativeSplit;
-
- if (split instanceof GridHadoopFileBlock) {
- GridHadoopFileBlock block = (GridHadoopFileBlock)split;
-
- nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), null);
- }
- else
- nativeSplit = (InputSplit)taskCtx.getNativeSplit(split);
-
- assert nativeSplit != null;
-
- OutputFormat outputFormat = null;
- Exception err = null;
-
- JobContextImpl jobCtx = taskCtx.jobContext();
-
- try {
- InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
- hadoopContext().getConfiguration());
-
- RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext());
-
- reader.initialize(nativeSplit, hadoopContext());
-
- hadoopContext().reader(reader);
-
- GridHadoopJobInfo jobInfo = taskCtx.job().info();
-
- outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx);
-
- Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration());
-
- try {
- mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
- }
- finally {
- closeWriter();
- }
-
- commit(outputFormat);
- }
- catch (InterruptedException e) {
- err = e;
-
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- catch (Exception e) {
- err = e;
-
- throw new IgniteCheckedException(e);
- }
- finally {
- if (err != null)
- abort(outputFormat);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Partitioner.java
deleted file mode 100644
index 0883520..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Partitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-/**
- * Hadoop partitioner adapter for v2 API.
- */
-public class GridHadoopV2Partitioner implements GridHadoopPartitioner {
- /** Partitioner instance. */
- private Partitioner<Object, Object> part;
-
- /**
- * @param cls Hadoop partitioner class.
- * @param conf Job configuration.
- */
- public GridHadoopV2Partitioner(Class<? extends Partitioner<?, ?>> cls, Configuration conf) {
- part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
- }
-
- /** {@inheritDoc} */
- @Override public int partition(Object key, Object val, int parts) {
- return part.getPartition(key, val, parts);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2ReduceTask.java
deleted file mode 100644
index 146e05c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2ReduceTask.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.reduce.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.*;
-
-/**
- * Hadoop reduce task implementation for v2 API.
- */
-public class GridHadoopV2ReduceTask extends GridHadoopV2Task {
- /** {@code True} if reduce, {@code false} if combine. */
- private final boolean reduce;
-
- /**
- * Constructor.
- *
- * @param taskInfo Task info.
- * @param reduce {@code True} if reduce, {@code false} if combine.
- */
- public GridHadoopV2ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) {
- super(taskInfo);
-
- this.reduce = reduce;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"ConstantConditions", "unchecked"})
- @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException {
- OutputFormat outputFormat = null;
- Exception err = null;
-
- JobContextImpl jobCtx = taskCtx.jobContext();
-
- try {
- outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null;
-
- Reducer reducer = ReflectionUtils.newInstance(reduce ? jobCtx.getReducerClass() : jobCtx.getCombinerClass(),
- jobCtx.getConfiguration());
-
- try {
- reducer.run(new WrappedReducer().getReducerContext(hadoopContext()));
- }
- finally {
- closeWriter();
- }
-
- commit(outputFormat);
- }
- catch (InterruptedException e) {
- err = e;
-
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- catch (Exception e) {
- err = e;
-
- throw new IgniteCheckedException(e);
- }
- finally {
- if (err != null)
- abort(outputFormat);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2SetupTask.java
deleted file mode 100644
index 54eda25..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2SetupTask.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.*;
-
-import java.io.*;
-
-/**
- * Hadoop setup task (prepares job).
- */
-public class GridHadoopV2SetupTask extends GridHadoopV2Task {
- /**
- * Constructor.
- *
- * @param taskInfo task info.
- */
- public GridHadoopV2SetupTask(GridHadoopTaskInfo taskInfo) {
- super(taskInfo);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override protected void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException {
- try {
- JobContextImpl jobCtx = taskCtx.jobContext();
-
- OutputFormat outputFormat = getOutputFormat(jobCtx);
-
- outputFormat.checkOutputSpecs(jobCtx);
-
- OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext());
-
- if (committer != null)
- committer.setupJob(jobCtx);
- }
- catch (ClassNotFoundException | IOException e) {
- throw new IgniteCheckedException(e);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
deleted file mode 100644
index e8ce70b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop API v2 splitter.
- */
-public class GridHadoopV2Splitter {
- /** */
- private static final String[] EMPTY_HOSTS = {};
-
- /**
- * @param ctx Job context.
- * @return Collection of mapped splits.
- * @throws IgniteCheckedException If mapping failed.
- */
- public static Collection<GridHadoopInputSplit> splitJob(JobContext ctx) throws IgniteCheckedException {
- try {
- InputFormat<?, ?> format = ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration());
-
- assert format != null;
-
- List<InputSplit> splits = format.getSplits(ctx);
-
- Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.size());
-
- int id = 0;
-
- for (InputSplit nativeSplit : splits) {
- if (nativeSplit instanceof FileSplit) {
- FileSplit s = (FileSplit)nativeSplit;
-
- res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
- }
- else
- res.add(GridHadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations()));
-
- id++;
- }
-
- return res;
- }
- catch (IOException | ClassNotFoundException e) {
- throw new IgniteCheckedException(e);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-
- /**
- * @param clsName Input split class name.
- * @param in Input stream.
- * @param hosts Optional hosts.
- * @return File block or {@code null} if it is not a {@link FileSplit} instance.
- * @throws IgniteCheckedException If failed.
- */
- public static GridHadoopFileBlock readFileBlock(String clsName, DataInput in, @Nullable String[] hosts)
- throws IgniteCheckedException {
- if (!FileSplit.class.getName().equals(clsName))
- return null;
-
- FileSplit split = new FileSplit();
-
- try {
- split.readFields(in);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- if (hosts == null)
- hosts = EMPTY_HOSTS;
-
- return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Task.java
deleted file mode 100644
index 37697c6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Task.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Extended Hadoop v2 task.
- */
-public abstract class GridHadoopV2Task extends GridHadoopTask {
- /** Hadoop context. */
- private GridHadoopV2Context hadoopCtx;
-
- /**
- * Constructor.
- *
- * @param taskInfo Task info.
- */
- protected GridHadoopV2Task(GridHadoopTaskInfo taskInfo) {
- super(taskInfo);
- }
-
- /** {@inheritDoc} */
- @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
- hadoopCtx = new GridHadoopV2Context(ctx);
-
- run0(ctx);
- }
-
- /**
- * Internal task routine.
- *
- * @param taskCtx Task context.
- * @throws IgniteCheckedException
- */
- protected abstract void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException;
-
- /**
- * @return hadoop context.
- */
- protected GridHadoopV2Context hadoopContext() {
- return hadoopCtx;
- }
-
- /**
- * Create and configure an OutputFormat instance.
- *
- * @param jobCtx Job context.
- * @return Instance of OutputFormat is specified in job configuration.
- * @throws ClassNotFoundException If specified class not found.
- */
- protected OutputFormat getOutputFormat(JobContext jobCtx) throws ClassNotFoundException {
- return ReflectionUtils.newInstance(jobCtx.getOutputFormatClass(), hadoopContext().getConfiguration());
- }
-
- /**
- * Put write into Hadoop context and return associated output format instance.
- *
- * @param jobCtx Job context.
- * @return Output format.
- * @throws IgniteCheckedException In case of Grid exception.
- * @throws InterruptedException In case of interrupt.
- */
- protected OutputFormat prepareWriter(JobContext jobCtx)
- throws IgniteCheckedException, InterruptedException {
- try {
- OutputFormat outputFormat = getOutputFormat(jobCtx);
-
- assert outputFormat != null;
-
- OutputCommitter outCommitter = outputFormat.getOutputCommitter(hadoopCtx);
-
- if (outCommitter != null)
- outCommitter.setupTask(hadoopCtx);
-
- RecordWriter writer = outputFormat.getRecordWriter(hadoopCtx);
-
- hadoopCtx.writer(writer);
-
- return outputFormat;
- }
- catch (IOException | ClassNotFoundException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
- * Closes writer.
- *
- * @throws Exception If fails and logger hasn't been specified.
- */
- protected void closeWriter() throws Exception {
- RecordWriter writer = hadoopCtx.writer();
-
- if (writer != null)
- writer.close(hadoopCtx);
- }
-
- /**
- * Setup task.
- *
- * @param outputFormat Output format.
- * @throws IOException In case of IO exception.
- * @throws InterruptedException In case of interrupt.
- */
- protected void setup(@Nullable OutputFormat outputFormat) throws IOException, InterruptedException {
- if (hadoopCtx.writer() != null) {
- assert outputFormat != null;
-
- outputFormat.getOutputCommitter(hadoopCtx).setupTask(hadoopCtx);
- }
- }
-
- /**
- * Commit task.
- *
- * @param outputFormat Output format.
- * @throws IgniteCheckedException In case of Grid exception.
- * @throws IOException In case of IO exception.
- * @throws InterruptedException In case of interrupt.
- */
- protected void commit(@Nullable OutputFormat outputFormat) throws IgniteCheckedException, IOException, InterruptedException {
- if (hadoopCtx.writer() != null) {
- assert outputFormat != null;
-
- OutputCommitter outputCommitter = outputFormat.getOutputCommitter(hadoopCtx);
-
- if (outputCommitter.needsTaskCommit(hadoopCtx))
- outputCommitter.commitTask(hadoopCtx);
- }
- }
-
- /**
- * Abort task.
- *
- * @param outputFormat Output format.
- */
- protected void abort(@Nullable OutputFormat outputFormat) {
- if (hadoopCtx.writer() != null) {
- assert outputFormat != null;
-
- try {
- outputFormat.getOutputCommitter(hadoopCtx).abortTask(hadoopCtx);
- }
- catch (IOException ignore) {
- // Ignore.
- }
- catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- hadoopCtx.cancel();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
deleted file mode 100644
index 41bd24a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.serializer.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.internal.processors.hadoop.v1.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
-
-/**
- * Context for task execution.
- */
-public class GridHadoopV2TaskContext extends GridHadoopTaskContext {
- /** */
- private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
-
- /**
- * Check for combiner grouping support (available since Hadoop 2.3).
- */
- static {
- boolean ok;
-
- try {
- JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
-
- ok = true;
- }
- catch (NoSuchMethodException ignore) {
- ok = false;
- }
-
- COMBINE_KEY_GROUPING_SUPPORTED = ok;
- }
-
- /** Flag is set if new context-object code is used for running the mapper. */
- private final boolean useNewMapper;
-
- /** Flag is set if new context-object code is used for running the reducer. */
- private final boolean useNewReducer;
-
- /** Flag is set if new context-object code is used for running the combiner. */
- private final boolean useNewCombiner;
-
- /** */
- private final JobContextImpl jobCtx;
-
- /** Set if task is to cancelling. */
- private volatile boolean cancelled;
-
- /** Current task. */
- private volatile GridHadoopTask task;
-
- /** Local node ID */
- private UUID locNodeId;
-
- /** Counters for task. */
- private final GridHadoopCounters cntrs = new GridHadoopCountersImpl();
-
- /**
- * @param taskInfo Task info.
- * @param job Job.
- * @param jobId Job ID.
- * @param locNodeId Local node ID.
- * @param jobConfDataInput DataInput for read JobConf.
- */
- public GridHadoopV2TaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob job, GridHadoopJobId jobId,
- @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
- super(taskInfo, job);
- this.locNodeId = locNodeId;
-
- // Before create JobConf instance we should set new context class loader.
- Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-
- try {
- JobConf jobConf = new JobConf();
-
- try {
- jobConf.readFields(jobConfDataInput);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- // For map-reduce jobs prefer local writes.
- jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
-
- jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
-
- useNewMapper = jobConf.getUseNewMapper();
- useNewReducer = jobConf.getUseNewReducer();
- useNewCombiner = jobConf.getCombinerClass() == null;
- }
- finally {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
-
- /** {@inheritDoc} */
- @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
- return cntrs.counter(grp, name, cls);
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopCounters counters() {
- return cntrs;
- }
-
- /**
- * Creates appropriate task from current task info.
- *
- * @return Task.
- */
- private GridHadoopTask createTask() {
- boolean isAbort = taskInfo().type() == GridHadoopTaskType.ABORT;
-
- switch (taskInfo().type()) {
- case SETUP:
- return useNewMapper ? new GridHadoopV2SetupTask(taskInfo()) : new GridHadoopV1SetupTask(taskInfo());
-
- case MAP:
- return useNewMapper ? new GridHadoopV2MapTask(taskInfo()) : new GridHadoopV1MapTask(taskInfo());
-
- case REDUCE:
- return useNewReducer ? new GridHadoopV2ReduceTask(taskInfo(), true) :
- new GridHadoopV1ReduceTask(taskInfo(), true);
-
- case COMBINE:
- return useNewCombiner ? new GridHadoopV2ReduceTask(taskInfo(), false) :
- new GridHadoopV1ReduceTask(taskInfo(), false);
-
- case COMMIT:
- case ABORT:
- return useNewReducer ? new GridHadoopV2CleanupTask(taskInfo(), isAbort) :
- new GridHadoopV1CleanupTask(taskInfo(), isAbort);
-
- default:
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void run() throws IgniteCheckedException {
- try {
- Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
-
- try {
- task = createTask();
- }
- catch (Throwable e) {
- throw transformException(e);
- }
-
- if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
-
- try {
- task.run(this);
- }
- catch (Throwable e) {
- throw transformException(e);
- }
- }
- finally {
- task = null;
-
- Thread.currentThread().setContextClassLoader(null);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- cancelled = true;
-
- GridHadoopTask t = task;
-
- if (t != null)
- t.cancel();
- }
-
- /** {@inheritDoc} */
- @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
- File locDir;
-
- switch(taskInfo().type()) {
- case MAP:
- case REDUCE:
- job().prepareTaskEnvironment(taskInfo());
-
- locDir = taskLocalDir(locNodeId, taskInfo());
-
- break;
-
- default:
- locDir = jobLocalDir(locNodeId, taskInfo().jobId());
- }
-
- Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
-
- try {
- FileSystem fs = FileSystem.get(jobConf());
-
- GridHadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
-
- LocalFileSystem locFs = FileSystem.getLocal(jobConf());
-
- locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
- }
- catch (Throwable e) {
- throw transformException(e);
- }
- finally {
- Thread.currentThread().setContextClassLoader(null);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
- job().cleanupTaskEnvironment(taskInfo());
- }
-
- /**
- * Creates Hadoop attempt ID.
- *
- * @return Attempt ID.
- */
- public TaskAttemptID attemptId() {
- TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
-
- return new TaskAttemptID(tid, taskInfo().attempt());
- }
-
- /**
- * @param type Task type.
- * @return Hadoop task type.
- */
- private TaskType taskType(GridHadoopTaskType type) {
- switch (type) {
- case SETUP:
- return TaskType.JOB_SETUP;
- case MAP:
- case COMBINE:
- return TaskType.MAP;
-
- case REDUCE:
- return TaskType.REDUCE;
-
- case COMMIT:
- case ABORT:
- return TaskType.JOB_CLEANUP;
-
- default:
- return null;
- }
- }
-
- /**
- * Gets job configuration of the task.
- *
- * @return Job configuration.
- */
- public JobConf jobConf() {
- return jobCtx.getJobConf();
- }
-
- /**
- * Gets job context of the task.
- *
- * @return Job context.
- */
- public JobContextImpl jobContext() {
- return jobCtx;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopPartitioner partitioner() throws IgniteCheckedException {
- Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
-
- if (partClsOld != null)
- return new GridHadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
-
- try {
- return new GridHadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
- }
- catch (ClassNotFoundException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
- * Gets serializer for specified class.
- *
- * @param cls Class.
- * @param jobConf Job configuration.
- * @return Appropriate serializer.
- */
- @SuppressWarnings("unchecked")
- private GridHadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
- A.notNull(cls, "cls");
-
- SerializationFactory factory = new SerializationFactory(jobConf);
-
- Serialization<?> serialization = factory.getSerialization(cls);
-
- if (serialization == null)
- throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
-
- if (serialization.getClass() == WritableSerialization.class)
- return new GridHadoopWritableSerialization((Class<? extends Writable>)cls);
-
- return new GridHadoopSerializationWrapper(serialization, cls);
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopSerialization keySerialization() throws IgniteCheckedException {
- return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopSerialization valueSerialization() throws IgniteCheckedException {
- return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
- }
-
- /** {@inheritDoc} */
- @Override public Comparator<Object> sortComparator() {
- return (Comparator<Object>)jobCtx.getSortComparator();
- }
-
- /** {@inheritDoc} */
- @Override public Comparator<Object> groupComparator() {
- Comparator<?> res;
-
- switch (taskInfo().type()) {
- case COMBINE:
- res = COMBINE_KEY_GROUPING_SUPPORTED ?
- jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
-
- break;
-
- case REDUCE:
- res = jobContext().getGroupingComparator();
-
- break;
-
- default:
- return null;
- }
-
- if (res != null && res.getClass() != sortComparator().getClass())
- return (Comparator<Object>)res;
-
- return null;
- }
-
- /**
- * @param split Split.
- * @return Native Hadoop split.
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings("unchecked")
- public Object getNativeSplit(GridHadoopInputSplit split) throws IgniteCheckedException {
- if (split instanceof GridHadoopExternalSplit)
- return readExternalSplit((GridHadoopExternalSplit)split);
-
- if (split instanceof GridHadoopSplitWrapper)
- return unwrapSplit((GridHadoopSplitWrapper)split);
-
- throw new IllegalStateException("Unknown split: " + split);
- }
-
- /**
- * @param split External split.
- * @return Native input split.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("unchecked")
- private Object readExternalSplit(GridHadoopExternalSplit split) throws IgniteCheckedException {
- Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
-
- try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
- FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
-
- in.seek(split.offset());
-
- String clsName = Text.readString(in);
-
- Class<?> cls = jobConf().getClassByName(clsName);
-
- assert cls != null;
-
- Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
-
- Deserializer deserializer = serialization.getDeserializer(cls);
-
- deserializer.open(in);
-
- Object res = deserializer.deserialize(null);
-
- deserializer.close();
-
- assert res != null;
-
- return res;
- }
- catch (IOException | ClassNotFoundException e) {
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopWritableSerialization.java
deleted file mode 100644
index 4361ad4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopWritableSerialization.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.io.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Optimized serialization for Hadoop {@link Writable} types.
- */
-public class GridHadoopWritableSerialization implements GridHadoopSerialization {
- /** */
- private final Class<? extends Writable> cls;
-
- /**
- * @param cls Class.
- */
- public GridHadoopWritableSerialization(Class<? extends Writable> cls) {
- assert cls != null;
-
- this.cls = cls;
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
- assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass();
-
- try {
- ((Writable)obj).write(out);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
- Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
-
- try {
- w.readFields(in);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- return w;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- // No-op.
- }
-}