You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:35 UTC
[19/31] incubator-ignite git commit: # IGNITE-386: WIP on internal
namings (3).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
deleted file mode 100644
index 2a38684..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Hadoop output collector.
- */
-public class GridHadoopV1OutputCollector implements OutputCollector {
- /** Job configuration. */
- private final JobConf jobConf;
-
- /** Task context. */
- private final GridHadoopTaskContext taskCtx;
-
- /** Optional direct writer. */
- private final RecordWriter writer;
-
- /** Task attempt. */
- private final TaskAttemptID attempt;
-
- /**
- * @param jobConf Job configuration.
- * @param taskCtx Task context.
- * @param directWrite Direct write flag.
- * @param fileName File name.
- * @throws IOException In case of IO exception.
- */
- GridHadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite,
- @Nullable String fileName, TaskAttemptID attempt) throws IOException {
- this.jobConf = jobConf;
- this.taskCtx = taskCtx;
- this.attempt = attempt;
-
- if (directWrite) {
- jobConf.set("mapreduce.task.attempt.id", attempt.toString());
-
- OutputFormat outFormat = jobConf.getOutputFormat();
-
- writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
- }
- else
- writer = null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void collect(Object key, Object val) throws IOException {
- if (writer != null)
- writer.write(key, val);
- else {
- try {
- taskCtx.output().write(key, val);
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e);
- }
- }
- }
-
- /**
- * Close writer.
- *
- * @throws IOException In case of IO exception.
- */
- public void closeWriter() throws IOException {
- if (writer != null)
- writer.close(Reporter.NULL);
- }
-
- /**
- * Setup task.
- *
- * @throws IOException If failed.
- */
- public void setup() throws IOException {
- if (writer != null)
- jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
- }
-
- /**
- * Commit task.
- *
- * @throws IOException In failed.
- */
- public void commit() throws IOException {
- if (writer != null) {
- OutputCommitter outputCommitter = jobConf.getOutputCommitter();
-
- TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
-
- if (outputCommitter.needsTaskCommit(taskCtx))
- outputCommitter.commitTask(taskCtx);
- }
- }
-
- /**
- * Abort task.
- */
- public void abort() {
- try {
- if (writer != null)
- jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
- }
- catch (IOException ignore) {
- // No-op.
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
deleted file mode 100644
index 688ccef..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-/**
- * Hadoop partitioner adapter for v1 API.
- */
-public class GridHadoopV1Partitioner implements GridHadoopPartitioner {
- /** Partitioner instance. */
- private Partitioner<Object, Object> part;
-
- /**
- * @param cls Hadoop partitioner class.
- * @param conf Job configuration.
- */
- public GridHadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) {
- part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
- }
-
- /** {@inheritDoc} */
- @Override public int partition(Object key, Object val, int parts) {
- return part.getPartition(key, val, parts);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
deleted file mode 100644
index 3aca637..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-/**
- * Hadoop reduce task implementation for v1 API.
- */
-public class GridHadoopV1ReduceTask extends GridHadoopV1Task {
- /** {@code True} if reduce, {@code false} if combine. */
- private final boolean reduce;
-
- /**
- * Constructor.
- *
- * @param taskInfo Task info.
- * @param reduce {@code True} if reduce, {@code false} if combine.
- */
- public GridHadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) {
- super(taskInfo);
-
- this.reduce = reduce;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- GridHadoopJob job = taskCtx.job();
-
- GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
- JobConf jobConf = ctx.jobConf();
-
- GridHadoopTaskInput input = taskCtx.input();
-
- GridHadoopV1OutputCollector collector = null;
-
- try {
- collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
-
- Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(),
- jobConf);
-
- assert reducer != null;
-
- try {
- try {
- while (input.next()) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Reduce task cancelled.");
-
- reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
- }
- }
- finally {
- reducer.close();
- }
- }
- finally {
- collector.closeWriter();
- }
-
- collector.commit();
- }
- catch (Exception e) {
- if (collector != null)
- collector.abort();
-
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
deleted file mode 100644
index 791ccdc..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-
-/**
- * Hadoop reporter implementation for v1 API.
- */
-public class GridHadoopV1Reporter implements Reporter {
- /** Context. */
- private final GridHadoopTaskContext ctx;
-
- /**
- * Creates new instance.
- *
- * @param ctx Context.
- */
- public GridHadoopV1Reporter(GridHadoopTaskContext ctx) {
- this.ctx = ctx;
- }
-
- /** {@inheritDoc} */
- @Override public void setStatus(String status) {
- // TODO
- }
-
- /** {@inheritDoc} */
- @Override public Counters.Counter getCounter(Enum<?> name) {
- return getCounter(name.getDeclaringClass().getName(), name.name());
- }
-
- /** {@inheritDoc} */
- @Override public Counters.Counter getCounter(String grp, String name) {
- return new GridHadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class));
- }
-
- /** {@inheritDoc} */
- @Override public void incrCounter(Enum<?> key, long amount) {
- getCounter(key).increment(amount);
- }
-
- /** {@inheritDoc} */
- @Override public void incrCounter(String grp, String cntr, long amount) {
- getCounter(grp, cntr).increment(amount);
- }
-
- /** {@inheritDoc} */
- @Override public InputSplit getInputSplit() throws UnsupportedOperationException {
- throw new UnsupportedOperationException("reporter has no input"); // TODO
- }
-
- /** {@inheritDoc} */
- @Override public float getProgress() {
- return 0.5f; // TODO
- }
-
- /** {@inheritDoc} */
- @Override public void progress() {
- // TODO
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
deleted file mode 100644
index c7dc3fd..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-
-/**
- * Hadoop setup task implementation for v1 API.
- */
-public class GridHadoopV1SetupTask extends GridHadoopV1Task {
- /**
- * Constructor.
- *
- * @param taskInfo Task info.
- */
- public GridHadoopV1SetupTask(GridHadoopTaskInfo taskInfo) {
- super(taskInfo);
- }
-
- /** {@inheritDoc} */
- @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
- try {
- ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf());
-
- OutputCommitter committer = ctx.jobConf().getOutputCommitter();
-
- if (committer != null)
- committer.setupJob(ctx.jobContext());
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
deleted file mode 100644
index 0e1fb44..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop API v1 splitter.
- */
-public class GridHadoopV1Splitter {
- /** */
- private static final String[] EMPTY_HOSTS = {};
-
- /**
- * @param jobConf Job configuration.
- * @return Collection of mapped splits.
- * @throws IgniteCheckedException If mapping failed.
- */
- public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException {
- try {
- InputFormat<?, ?> format = jobConf.getInputFormat();
-
- assert format != null;
-
- InputSplit[] splits = format.getSplits(jobConf, 0);
-
- Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length);
-
- for (int i = 0; i < splits.length; i++) {
- InputSplit nativeSplit = splits[i];
-
- if (nativeSplit instanceof FileSplit) {
- FileSplit s = (FileSplit)nativeSplit;
-
- res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
- }
- else
- res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
- }
-
- return res;
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
- * @param clsName Input split class name.
- * @param in Input stream.
- * @param hosts Optional hosts.
- * @return File block or {@code null} if it is not a {@link FileSplit} instance.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
- @Nullable String[] hosts) throws IgniteCheckedException {
- if (!FileSplit.class.getName().equals(clsName))
- return null;
-
- FileSplit split = U.newInstance(FileSplit.class);
-
- try {
- split.readFields(in);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- if (hosts == null)
- hosts = EMPTY_HOSTS;
-
- return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
deleted file mode 100644
index 305bc4e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.text.*;
-
-/**
- * Extended Hadoop v1 task.
- */
-public abstract class GridHadoopV1Task extends GridHadoopTask {
- /** Indicates that this task is to be cancelled. */
- private volatile boolean cancelled;
-
- /**
- * Constructor.
- *
- * @param taskInfo Task info.
- */
- protected GridHadoopV1Task(GridHadoopTaskInfo taskInfo) {
- super(taskInfo);
- }
-
- /**
- * Gets file name for that task result.
- *
- * @return File name.
- */
- public String fileName() {
- NumberFormat numFormat = NumberFormat.getInstance();
-
- numFormat.setMinimumIntegerDigits(5);
- numFormat.setGroupingUsed(false);
-
- return "part-" + numFormat.format(info().taskNumber());
- }
-
- /**
- *
- * @param jobConf Job configuration.
- * @param taskCtx Task context.
- * @param directWrite Direct write flag.
- * @param fileName File name.
- * @param attempt Attempt of task.
- * @return Collector.
- * @throws IOException In case of IO exception.
- */
- protected GridHadoopV1OutputCollector collector(JobConf jobConf, GridHadoopV2TaskContext taskCtx,
- boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException {
- GridHadoopV1OutputCollector collector = new GridHadoopV1OutputCollector(jobConf, taskCtx, directWrite,
- fileName, attempt) {
- /** {@inheritDoc} */
- @Override public void collect(Object key, Object val) throws IOException {
- if (cancelled)
- throw new HadoopTaskCancelledException("Task cancelled.");
-
- super.collect(key, val);
- }
- };
-
- collector.setup();
-
- return collector;
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- cancelled = true;
- }
-
- /** Returns true if task is cancelled. */
- public boolean isCancelled() {
- return cancelled;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
new file mode 100644
index 0000000..85f08be
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+/**
+ * Hadoop cleanup task implementation for v1 API.
+ */
+public class HadoopV1CleanupTask extends HadoopV1Task {
+ /** Abort flag. */
+ private final boolean abort;
+
+ /**
+ * @param taskInfo Task info.
+ * @param abort Abort flag.
+ */
+ public HadoopV1CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) {
+ super(taskInfo);
+
+ this.abort = abort;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobContext jobCtx = ctx.jobContext();
+
+ try {
+ OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter();
+
+ if (abort)
+ committer.abortJob(jobCtx, JobStatus.State.FAILED);
+ else
+ committer.commitJob(jobCtx);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
new file mode 100644
index 0000000..609297b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
+
+/**
+ * Hadoop counter implementation for v1 API.
+ */
+public class HadoopV1Counter extends Counters.Counter {
+ /** Delegate. */
+ private final HadoopLongCounter cntr;
+
+ /**
+ * Creates new instance.
+ *
+ * @param cntr Delegate counter.
+ */
+ public HadoopV1Counter(HadoopLongCounter cntr) {
+ this.cntr = cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDisplayName(String displayName) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return cntr.name();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDisplayName() {
+ return getName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getValue() {
+ return cntr.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setValue(long val) {
+ cntr.value(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void increment(long incr) {
+ cntr.increment(incr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String makeEscapedCompactString() {
+ return toEscapedCompactString(new HadoopV2Counter(cntr));
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public boolean contentEquals(Counters.Counter cntr) {
+ return getUnderlyingCounter().equals(cntr.getUnderlyingCounter());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCounter() {
+ return cntr.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter getUnderlyingCounter() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
new file mode 100644
index 0000000..51856d6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+/**
+ * Hadoop map task implementation for v1 API.
+ */
+public class HadoopV1MapTask extends HadoopV1Task {
+ /** */
+ private static final String[] EMPTY_HOSTS = new String[0];
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo
+ */
+ public HadoopV1MapTask(GridHadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ GridHadoopJob job = taskCtx.job();
+
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobConf jobConf = ctx.jobConf();
+
+ InputFormat inFormat = jobConf.getInputFormat();
+
+ GridHadoopInputSplit split = info().inputSplit();
+
+ InputSplit nativeSplit;
+
+ if (split instanceof GridHadoopFileBlock) {
+ GridHadoopFileBlock block = (GridHadoopFileBlock)split;
+
+ nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+ }
+ else
+ nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+
+ assert nativeSplit != null;
+
+ Reporter reporter = new HadoopV1Reporter(taskCtx);
+
+ HadoopV1OutputCollector collector = null;
+
+ try {
+ collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
+ fileName(), ctx.attemptId());
+
+ RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+
+ Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+
+ Object key = reader.createKey();
+ Object val = reader.createValue();
+
+ assert mapper != null;
+
+ try {
+ try {
+ while (reader.next(key, val)) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Map task cancelled.");
+
+ mapper.map(key, val, collector, reporter);
+ }
+ }
+ finally {
+ mapper.close();
+ }
+ }
+ finally {
+ collector.closeWriter();
+ }
+
+ collector.commit();
+ }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
new file mode 100644
index 0000000..ac23bb3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Hadoop output collector.
+ */
+public class HadoopV1OutputCollector implements OutputCollector {
+ /** Job configuration. */
+ private final JobConf jobConf;
+
+ /** Task context. */
+ private final GridHadoopTaskContext taskCtx;
+
+ /** Optional direct writer. */
+ private final RecordWriter writer;
+
+ /** Task attempt. */
+ private final TaskAttemptID attempt;
+
+ /**
+ * @param jobConf Job configuration.
+ * @param taskCtx Task context.
+ * @param directWrite Direct write flag.
+ * @param fileName File name.
+ * @throws IOException In case of IO exception.
+ */
+ HadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite,
+ @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+ this.jobConf = jobConf;
+ this.taskCtx = taskCtx;
+ this.attempt = attempt;
+
+ if (directWrite) {
+ jobConf.set("mapreduce.task.attempt.id", attempt.toString());
+
+ OutputFormat outFormat = jobConf.getOutputFormat();
+
+ writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
+ }
+ else
+ writer = null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void collect(Object key, Object val) throws IOException {
+ if (writer != null)
+ writer.write(key, val);
+ else {
+ try {
+ taskCtx.output().write(key, val);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * Close writer.
+ *
+ * @throws IOException In case of IO exception.
+ */
+ public void closeWriter() throws IOException {
+ if (writer != null)
+ writer.close(Reporter.NULL);
+ }
+
+ /**
+ * Setup task.
+ *
+ * @throws IOException If failed.
+ */
+ public void setup() throws IOException {
+ if (writer != null)
+ jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
+ }
+
+ /**
+ * Commit task.
+ *
+ * @throws IOException In failed.
+ */
+ public void commit() throws IOException {
+ if (writer != null) {
+ OutputCommitter outputCommitter = jobConf.getOutputCommitter();
+
+ TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
+
+ if (outputCommitter.needsTaskCommit(taskCtx))
+ outputCommitter.commitTask(taskCtx);
+ }
+ }
+
+ /**
+ * Abort task.
+ */
+ public void abort() {
+ try {
+ if (writer != null)
+ jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
+ }
+ catch (IOException ignore) {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
new file mode 100644
index 0000000..36fdd55
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+/**
+ * Hadoop partitioner adapter for v1 API.
+ */
+public class HadoopV1Partitioner implements GridHadoopPartitioner {
+ /** Partitioner instance. */
+ private Partitioner<Object, Object> part;
+
+ /**
+ * @param cls Hadoop partitioner class.
+ * @param conf Job configuration.
+ */
+ public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) {
+ part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key, Object val, int parts) {
+ return part.getPartition(key, val, parts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
new file mode 100644
index 0000000..b5c6bfa
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+/**
+ * Hadoop reduce task implementation for v1 API.
+ */
+public class HadoopV1ReduceTask extends HadoopV1Task {
+ /** {@code True} if reduce, {@code false} if combine. */
+ private final boolean reduce;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ * @param reduce {@code True} if reduce, {@code false} if combine.
+ */
+ public HadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) {
+ super(taskInfo);
+
+ this.reduce = reduce;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ GridHadoopJob job = taskCtx.job();
+
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobConf jobConf = ctx.jobConf();
+
+ GridHadoopTaskInput input = taskCtx.input();
+
+ HadoopV1OutputCollector collector = null;
+
+ try {
+ collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+
+ Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(),
+ jobConf);
+
+ assert reducer != null;
+
+ try {
+ try {
+ while (input.next()) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+ reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ }
+ }
+ finally {
+ reducer.close();
+ }
+ }
+ finally {
+ collector.closeWriter();
+ }
+
+ collector.commit();
+ }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
new file mode 100644
index 0000000..db4e159
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+
+/**
+ * Hadoop reporter implementation for v1 API.
+ */
+public class HadoopV1Reporter implements Reporter {
+ /** Context. */
+ private final GridHadoopTaskContext ctx;
+
+ /**
+ * Creates new instance.
+ *
+ * @param ctx Context.
+ */
+ public HadoopV1Reporter(GridHadoopTaskContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setStatus(String status) {
+ // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counters.Counter getCounter(Enum<?> name) {
+ return getCounter(name.getDeclaringClass().getName(), name.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counters.Counter getCounter(String grp, String name) {
+ return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrCounter(Enum<?> key, long amount) {
+ getCounter(key).increment(amount);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrCounter(String grp, String cntr, long amount) {
+ getCounter(grp, cntr).increment(amount);
+ }
+
+ /** {@inheritDoc} */
+ @Override public InputSplit getInputSplit() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("reporter has no input"); // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getProgress() {
+ return 0.5f; // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public void progress() {
+ // TODO
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
new file mode 100644
index 0000000..c427774
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+/**
+ * Hadoop setup task implementation for v1 API.
+ */
+public class HadoopV1SetupTask extends HadoopV1Task {
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ */
+ public HadoopV1SetupTask(GridHadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ try {
+ ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf());
+
+ OutputCommitter committer = ctx.jobConf().getOutputCommitter();
+
+ if (committer != null)
+ committer.setupJob(ctx.jobContext());
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
new file mode 100644
index 0000000..0d89082
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop API v1 splitter.
+ */
+public class HadoopV1Splitter {
+ /** */
+ private static final String[] EMPTY_HOSTS = {};
+
+ /**
+ * @param jobConf Job configuration.
+ * @return Collection of mapped splits.
+ * @throws IgniteCheckedException If mapping failed.
+ */
+ public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException {
+ try {
+ InputFormat<?, ?> format = jobConf.getInputFormat();
+
+ assert format != null;
+
+ InputSplit[] splits = format.getSplits(jobConf, 0);
+
+ Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length);
+
+ for (int i = 0; i < splits.length; i++) {
+ InputSplit nativeSplit = splits[i];
+
+ if (nativeSplit instanceof FileSplit) {
+ FileSplit s = (FileSplit)nativeSplit;
+
+ res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
+ }
+ else
+ res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
+ }
+
+ return res;
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * @param clsName Input split class name.
+ * @param in Input stream.
+ * @param hosts Optional hosts.
+ * @return File block or {@code null} if it is not a {@link FileSplit} instance.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
+ @Nullable String[] hosts) throws IgniteCheckedException {
+ if (!FileSplit.class.getName().equals(clsName))
+ return null;
+
+ FileSplit split = U.newInstance(FileSplit.class);
+
+ try {
+ split.readFields(in);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ if (hosts == null)
+ hosts = EMPTY_HOSTS;
+
+ return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
new file mode 100644
index 0000000..71a259c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.text.*;
+
+/**
+ * Extended Hadoop v1 task.
+ */
+public abstract class HadoopV1Task extends GridHadoopTask {
+ /** Indicates that this task is to be cancelled. */
+ private volatile boolean cancelled;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ */
+ protected HadoopV1Task(GridHadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /**
+ * Gets file name for that task result.
+ *
+ * @return File name.
+ */
+ public String fileName() {
+ NumberFormat numFormat = NumberFormat.getInstance();
+
+ numFormat.setMinimumIntegerDigits(5);
+ numFormat.setGroupingUsed(false);
+
+ return "part-" + numFormat.format(info().taskNumber());
+ }
+
+ /**
+ *
+ * @param jobConf Job configuration.
+ * @param taskCtx Task context.
+ * @param directWrite Direct write flag.
+ * @param fileName File name.
+ * @param attempt Attempt of task.
+ * @return Collector.
+ * @throws IOException In case of IO exception.
+ */
+ protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx,
+ boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+ HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite,
+ fileName, attempt) {
+ /** {@inheritDoc} */
+ @Override public void collect(Object key, Object val) throws IOException {
+ if (cancelled)
+ throw new HadoopTaskCancelledException("Task cancelled.");
+
+ super.collect(key, val);
+ }
+ };
+
+ collector.setup();
+
+ return collector;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ cancelled = true;
+ }
+
+ /** Returns true if task is cancelled. */
+ public boolean isCancelled() {
+ return cancelled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
deleted file mode 100644
index 36b40a2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.io.*;
-
-/**
- * Split serialized in external file.
- */
-public class GridHadoopExternalSplit extends GridHadoopInputSplit {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long off;
-
- /**
- * For {@link Externalizable}.
- */
- public GridHadoopExternalSplit() {
- // No-op.
- }
-
- /**
- * @param hosts Hosts.
- * @param off Offset of this split in external file.
- */
- public GridHadoopExternalSplit(String[] hosts, long off) {
- assert off >= 0 : off;
- assert hosts != null;
-
- this.hosts = hosts;
- this.off = off;
- }
-
- /**
- * @return Offset of this input split in external file.
- */
- public long offset() {
- return off;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(off);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- off = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridHadoopExternalSplit that = (GridHadoopExternalSplit) o;
-
- return off == that.off;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return (int)(off ^ (off >>> 32));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
deleted file mode 100644
index 5ef4759..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.classification.*;
-import org.apache.hadoop.conf.*;
-
-/**
- * A fake helper to load the native hadoop code i.e. libhadoop.so.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class GridHadoopNativeCodeLoader {
- /**
- * Check if native-hadoop code is loaded for this platform.
- *
- * @return <code>true</code> if native-hadoop is loaded,
- * else <code>false</code>
- */
- public static boolean isNativeCodeLoaded() {
- return false;
- }
-
- /**
- * Returns true only if this build was compiled with support for snappy.
- */
- public static boolean buildSupportsSnappy() {
- return false;
- }
-
- /**
- * @return Library name.
- */
- public static String getLibraryName() {
- throw new IllegalStateException();
- }
-
- /**
- * Return if native hadoop libraries, if present, can be used for this job.
- * @param conf configuration
- *
- * @return <code>true</code> if native hadoop libraries, if present, can be
- * used for this job; <code>false</code> otherwise.
- */
- public boolean getLoadNativeLibraries(Configuration conf) {
- return false;
- }
-
- /**
- * Set if native hadoop libraries, if present, can be used for this job.
- *
- * @param conf configuration
- * @param loadNativeLibraries can native hadoop libraries be loaded
- */
- public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) {
- // No-op.
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
deleted file mode 100644
index 0f38548..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.io.serializer.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * The wrapper around external serializer.
- */
-public class GridHadoopSerializationWrapper<T> implements GridHadoopSerialization {
- /** External serializer - writer. */
- private final Serializer<T> serializer;
-
- /** External serializer - reader. */
- private final Deserializer<T> deserializer;
-
- /** Data output for current write operation. */
- private OutputStream currOut;
-
- /** Data input for current read operation. */
- private InputStream currIn;
-
- /** Wrapper around current output to provide OutputStream interface. */
- private final OutputStream outStream = new OutputStream() {
- /** {@inheritDoc} */
- @Override public void write(int b) throws IOException {
- currOut.write(b);
- }
-
- /** {@inheritDoc} */
- @Override public void write(byte[] b, int off, int len) throws IOException {
- currOut.write(b, off, len);
- }
- };
-
- /** Wrapper around current input to provide InputStream interface. */
- private final InputStream inStream = new InputStream() {
- /** {@inheritDoc} */
- @Override public int read() throws IOException {
- return currIn.read();
- }
-
- /** {@inheritDoc} */
- @Override public int read(byte[] b, int off, int len) throws IOException {
- return currIn.read(b, off, len);
- }
- };
-
- /**
- * @param serialization External serializer to wrap.
- * @param cls The class to serialize.
- */
- public GridHadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException {
- assert cls != null;
-
- serializer = serialization.getSerializer(cls);
- deserializer = serialization.getDeserializer(cls);
-
- try {
- serializer.open(outStream);
- deserializer.open(inStream);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
- assert out != null;
- assert obj != null;
-
- try {
- currOut = (OutputStream)out;
-
- serializer.serialize((T)obj);
-
- currOut = null;
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
- assert in != null;
-
- try {
- currIn = (InputStream)in;
-
- T res = deserializer.deserialize((T) obj);
-
- currIn = null;
-
- return res;
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- try {
- serializer.close();
- deserializer.close();
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
deleted file mode 100644
index 48558fc..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Fake manager for shutdown hooks.
- */
-public class GridHadoopShutdownHookManager {
- /** */
- private static final GridHadoopShutdownHookManager MGR = new GridHadoopShutdownHookManager();
-
- /**
- * Return <code>ShutdownHookManager</code> singleton.
- *
- * @return <code>ShutdownHookManager</code> singleton.
- */
- public static GridHadoopShutdownHookManager get() {
- return MGR;
- }
-
- /** */
- private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>());
-
- /** */
- private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
-
- /**
- * Singleton.
- */
- private GridHadoopShutdownHookManager() {
- // No-op.
- }
-
- /**
- * Adds a shutdownHook with a priority, the higher the priority
- * the earlier will run. ShutdownHooks with same priority run
- * in a non-deterministic order.
- *
- * @param shutdownHook shutdownHook <code>Runnable</code>
- * @param priority priority of the shutdownHook.
- */
- public void addShutdownHook(Runnable shutdownHook, int priority) {
- if (shutdownHook == null)
- throw new IllegalArgumentException("shutdownHook cannot be NULL");
-
- hooks.add(shutdownHook);
- }
-
- /**
- * Removes a shutdownHook.
- *
- * @param shutdownHook shutdownHook to remove.
- * @return TRUE if the shutdownHook was registered and removed,
- * FALSE otherwise.
- */
- public boolean removeShutdownHook(Runnable shutdownHook) {
- return hooks.remove(shutdownHook);
- }
-
- /**
- * Indicates if a shutdownHook is registered or not.
- *
- * @param shutdownHook shutdownHook to check if registered.
- * @return TRUE/FALSE depending if the shutdownHook is is registered.
- */
- public boolean hasShutdownHook(Runnable shutdownHook) {
- return hooks.contains(shutdownHook);
- }
-
- /**
- * Indicates if shutdown is in progress or not.
- *
- * @return TRUE if the shutdown is in progress, otherwise FALSE.
- */
- public boolean isShutdownInProgress() {
- return shutdownInProgress.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
deleted file mode 100644
index 57edfa9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * The wrapper for native hadoop input splits.
- *
- * Warning!! This class must not depend on any Hadoop classes directly or indirectly.
- */
-public class GridHadoopSplitWrapper extends GridHadoopInputSplit {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Native hadoop input split. */
- private byte[] bytes;
-
- /** */
- private String clsName;
-
- /** Internal ID */
- private int id;
-
- /**
- * Creates new split wrapper.
- */
- public GridHadoopSplitWrapper() {
- // No-op.
- }
-
- /**
- * Creates new split wrapper.
- *
- * @param id Split ID.
- * @param clsName Class name.
- * @param bytes Serialized class.
- * @param hosts Hosts where split is located.
- */
- public GridHadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) {
- assert hosts != null;
- assert clsName != null;
- assert bytes != null;
-
- this.hosts = hosts;
- this.id = id;
-
- this.clsName = clsName;
- this.bytes = bytes;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(id);
-
- out.writeUTF(clsName);
- U.writeByteArray(out, bytes);
- }
-
- /**
- * @return Class name.
- */
- public String className() {
- return clsName;
- }
-
- /**
- * @return Class bytes.
- */
- public byte[] bytes() {
- return bytes;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- id = in.readInt();
-
- clsName = in.readUTF();
- bytes = U.readByteArray(in);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridHadoopSplitWrapper that = (GridHadoopSplitWrapper)o;
-
- return id == that.id;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return id;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
deleted file mode 100644
index 38be3da..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.*;
-
-import java.io.*;
-
-/**
- * Hadoop cleanup task (commits or aborts job).
- */
-public class GridHadoopV2CleanupTask extends GridHadoopV2Task {
- /** Abort flag. */
- private final boolean abort;
-
- /**
- * @param taskInfo Task info.
- * @param abort Abort flag.
- */
- public GridHadoopV2CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) {
- super(taskInfo);
-
- this.abort = abort;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException {
- JobContextImpl jobCtx = taskCtx.jobContext();
-
- try {
- OutputFormat outputFormat = getOutputFormat(jobCtx);
-
- OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext());
-
- if (committer != null) {
- if (abort)
- committer.abortJob(jobCtx, JobStatus.State.FAILED);
- else
- committer.commitJob(jobCtx);
- }
- }
- catch (ClassNotFoundException | IOException e) {
- throw new IgniteCheckedException(e);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
deleted file mode 100644
index 9964d91..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.mapreduce.task.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks.
- */
-public class GridHadoopV2Context extends JobContextImpl implements MapContext, ReduceContext {
- /** Input reader to overriding of GridHadoopTaskContext input. */
- private RecordReader reader;
-
- /** Output writer to overriding of GridHadoopTaskContext output. */
- private RecordWriter writer;
-
- /** Output is provided by executor environment. */
- private final GridHadoopTaskOutput output;
-
- /** Input is provided by executor environment. */
- private final GridHadoopTaskInput input;
-
- /** Unique identifier for a task attempt. */
- private final TaskAttemptID taskAttemptID;
-
- /** Indicates that this task is to be cancelled. */
- private volatile boolean cancelled;
-
- /** Input split. */
- private InputSplit inputSplit;
-
- /** */
- private final GridHadoopTaskContext ctx;
-
- /** */
- private String status;
-
- /**
- * @param ctx Context for IO operations.
- */
- public GridHadoopV2Context(GridHadoopV2TaskContext ctx) {
- super(ctx.jobConf(), ctx.jobContext().getJobID());
-
- taskAttemptID = ctx.attemptId();
-
- conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString());
- conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString());
-
- output = ctx.output();
- input = ctx.input();
-
- this.ctx = ctx;
- }
-
- /** {@inheritDoc} */
- @Override public InputSplit getInputSplit() {
- if (inputSplit == null) {
- GridHadoopInputSplit split = ctx.taskInfo().inputSplit();
-
- if (split == null)
- return null;
-
- if (split instanceof GridHadoopFileBlock) {
- GridHadoopFileBlock fileBlock = (GridHadoopFileBlock)split;
-
- inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null);
- }
- else if (split instanceof GridHadoopExternalSplit)
- throw new UnsupportedOperationException(); // TODO
- else if (split instanceof GridHadoopSplitWrapper)
- inputSplit = (InputSplit) HadoopUtils.unwrapSplit((GridHadoopSplitWrapper) split);
- else
- throw new IllegalStateException();
- }
-
- return inputSplit;
- }
-
- /** {@inheritDoc} */
- @Override public boolean nextKeyValue() throws IOException, InterruptedException {
- if (cancelled)
- throw new HadoopTaskCancelledException("Task cancelled.");
-
- return reader.nextKeyValue();
- }
-
- /** {@inheritDoc} */
- @Override public Object getCurrentKey() throws IOException, InterruptedException {
- if (reader != null)
- return reader.getCurrentKey();
-
- return input.key();
- }
-
- /** {@inheritDoc} */
- @Override public Object getCurrentValue() throws IOException, InterruptedException {
- return reader.getCurrentValue();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void write(Object key, Object val) throws IOException, InterruptedException {
- if (cancelled)
- throw new HadoopTaskCancelledException("Task cancelled.");
-
- if (writer != null)
- writer.write(key, val);
- else {
- try {
- output.write(key, val);
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public OutputCommitter getOutputCommitter() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public TaskAttemptID getTaskAttemptID() {
- return taskAttemptID;
- }
-
- /** {@inheritDoc} */
- @Override public void setStatus(String msg) {
- status = msg;
- }
-
- /** {@inheritDoc} */
- @Override public String getStatus() {
- return status;
- }
-
- /** {@inheritDoc} */
- @Override public float getProgress() {
- return 0.5f; // TODO
- }
-
- /** {@inheritDoc} */
- @Override public Counter getCounter(Enum<?> cntrName) {
- return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name());
- }
-
- /** {@inheritDoc} */
- @Override public Counter getCounter(String grpName, String cntrName) {
- return new GridHadoopV2Counter(ctx.counter(grpName, cntrName, HadoopLongCounter.class));
- }
-
- /** {@inheritDoc} */
- @Override public void progress() {
- // No-op.
- }
-
- /**
- * Overrides default input data reader.
- *
- * @param reader New reader.
- */
- public void reader(RecordReader reader) {
- this.reader = reader;
- }
-
- /** {@inheritDoc} */
- @Override public boolean nextKey() throws IOException, InterruptedException {
- if (cancelled)
- throw new HadoopTaskCancelledException("Task cancelled.");
-
- return input.next();
- }
-
- /** {@inheritDoc} */
- @Override public Iterable getValues() throws IOException, InterruptedException {
- return new Iterable() {
- @Override public Iterator iterator() {
- return input.values();
- }
- };
- }
-
- /**
- * @return Overridden output data writer.
- */
- public RecordWriter writer() {
- return writer;
- }
-
- /**
- * Overrides default output data writer.
- *
- * @param writer New writer.
- */
- public void writer(RecordWriter writer) {
- this.writer = writer;
- }
-
- /**
- * Cancels the task by stop the IO.
- */
- public void cancel() {
- cancelled = true;
- }
-}