You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/05 10:06:29 UTC
[18/51] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
new file mode 100644
index 0000000..c21e494
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+
+/**
+ * Hadoop communication message listener.
+ */
+public interface HadoopMessageListener {
+ /**
+ * @param desc Process descriptor.
+ * @param msg Hadoop message.
+ */
+ public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg);
+
+ /**
+ * Called when connection to remote process was lost.
+ *
+ * @param desc Process descriptor.
+ */
+ public void onConnectionLost(HadoopProcessDescriptor desc);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
new file mode 100644
index 0000000..c4d1c54
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Grid client for NIO server.
+ */
+public class HadoopTcpNioCommunicationClient extends HadoopAbstractCommunicationClient {
+ /** Socket. */
+ private final GridNioSession ses;
+
+ /**
+ * Constructor for test purposes only.
+ */
+ public HadoopTcpNioCommunicationClient() {
+ ses = null;
+ }
+
+ /**
+ * @param ses Session.
+ */
+ public HadoopTcpNioCommunicationClient(GridNioSession ses) {
+ assert ses != null;
+
+ this.ses = ses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean close() {
+ boolean res = super.close();
+
+ if (res)
+ ses.close();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void forceClose() {
+ super.forceClose();
+
+ ses.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg)
+ throws IgniteCheckedException {
+ if (closed())
+ throw new IgniteCheckedException("Client was closed: " + this);
+
+ GridNioFuture<?> fut = ses.send(msg);
+
+ if (fut.isDone()) {
+ try {
+ fut.get();
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getIdleTime() {
+ long now = U.currentTimeMillis();
+
+ // Session can be used for receiving and sending.
+ return Math.min(Math.min(now - ses.lastReceiveTime(), now - ses.lastSendScheduleTime()),
+ now - ses.lastSendTime());
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopTcpNioCommunicationClient.class, this, super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1CleanupTask.java
deleted file mode 100644
index 99ee9b77..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1CleanupTask.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-
-/**
- * Hadoop cleanup task implementation for v1 API.
- */
-public class GridHadoopV1CleanupTask extends GridHadoopV1Task {
- /** Abort flag. */
- private final boolean abort;
-
- /**
- * @param taskInfo Task info.
- * @param abort Abort flag.
- */
- public GridHadoopV1CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) {
- super(taskInfo);
-
- this.abort = abort;
- }
-
- /** {@inheritDoc} */
- @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
- JobContext jobCtx = ctx.jobContext();
-
- try {
- OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter();
-
- if (abort)
- committer.abortJob(jobCtx, JobStatus.State.FAILED);
- else
- committer.commitJob(jobCtx);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Counter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Counter.java
deleted file mode 100644
index b986d3e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Counter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-
-import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
-
-/**
- * Hadoop counter implementation for v1 API.
- */
-public class GridHadoopV1Counter extends Counters.Counter {
- /** Delegate. */
- private final GridHadoopLongCounter cntr;
-
- /**
- * Creates new instance.
- *
- * @param cntr Delegate counter.
- */
- public GridHadoopV1Counter(GridHadoopLongCounter cntr) {
- this.cntr = cntr;
- }
-
- /** {@inheritDoc} */
- @Override public void setDisplayName(String displayName) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String getName() {
- return cntr.name();
- }
-
- /** {@inheritDoc} */
- @Override public String getDisplayName() {
- return getName();
- }
-
- /** {@inheritDoc} */
- @Override public long getValue() {
- return cntr.value();
- }
-
- /** {@inheritDoc} */
- @Override public void setValue(long val) {
- cntr.value(val);
- }
-
- /** {@inheritDoc} */
- @Override public void increment(long incr) {
- cntr.increment(incr);
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public String makeEscapedCompactString() {
- return toEscapedCompactString(new GridHadoopV2Counter(cntr));
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override public boolean contentEquals(Counters.Counter cntr) {
- return getUnderlyingCounter().equals(cntr.getUnderlyingCounter());
- }
-
- /** {@inheritDoc} */
- @Override public long getCounter() {
- return cntr.value();
- }
-
- /** {@inheritDoc} */
- @Override public Counter getUnderlyingCounter() {
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
deleted file mode 100644
index 878b61b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-/**
- * Hadoop map task implementation for v1 API.
- */
-public class GridHadoopV1MapTask extends GridHadoopV1Task {
- /** */
- private static final String[] EMPTY_HOSTS = new String[0];
-
- /**
- * Constructor.
- *
- * @param taskInfo
- */
- public GridHadoopV1MapTask(GridHadoopTaskInfo taskInfo) {
- super(taskInfo);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- GridHadoopJob job = taskCtx.job();
-
- GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
- JobConf jobConf = ctx.jobConf();
-
- InputFormat inFormat = jobConf.getInputFormat();
-
- GridHadoopInputSplit split = info().inputSplit();
-
- InputSplit nativeSplit;
-
- if (split instanceof GridHadoopFileBlock) {
- GridHadoopFileBlock block = (GridHadoopFileBlock)split;
-
- nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
- }
- else
- nativeSplit = (InputSplit)ctx.getNativeSplit(split);
-
- assert nativeSplit != null;
-
- Reporter reporter = new GridHadoopV1Reporter(taskCtx);
-
- GridHadoopV1OutputCollector collector = null;
-
- try {
- collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
- fileName(), ctx.attemptId());
-
- RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
-
- Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
-
- Object key = reader.createKey();
- Object val = reader.createValue();
-
- assert mapper != null;
-
- try {
- try {
- while (reader.next(key, val)) {
- if (isCancelled())
- throw new GridHadoopTaskCancelledException("Map task cancelled.");
-
- mapper.map(key, val, collector, reporter);
- }
- }
- finally {
- mapper.close();
- }
- }
- finally {
- collector.closeWriter();
- }
-
- collector.commit();
- }
- catch (Exception e) {
- if (collector != null)
- collector.abort();
-
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
deleted file mode 100644
index 2a38684..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Hadoop output collector.
- */
-public class GridHadoopV1OutputCollector implements OutputCollector {
- /** Job configuration. */
- private final JobConf jobConf;
-
- /** Task context. */
- private final GridHadoopTaskContext taskCtx;
-
- /** Optional direct writer. */
- private final RecordWriter writer;
-
- /** Task attempt. */
- private final TaskAttemptID attempt;
-
- /**
- * @param jobConf Job configuration.
- * @param taskCtx Task context.
- * @param directWrite Direct write flag.
- * @param fileName File name.
- * @throws IOException In case of IO exception.
- */
- GridHadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite,
- @Nullable String fileName, TaskAttemptID attempt) throws IOException {
- this.jobConf = jobConf;
- this.taskCtx = taskCtx;
- this.attempt = attempt;
-
- if (directWrite) {
- jobConf.set("mapreduce.task.attempt.id", attempt.toString());
-
- OutputFormat outFormat = jobConf.getOutputFormat();
-
- writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
- }
- else
- writer = null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void collect(Object key, Object val) throws IOException {
- if (writer != null)
- writer.write(key, val);
- else {
- try {
- taskCtx.output().write(key, val);
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e);
- }
- }
- }
-
- /**
- * Close writer.
- *
- * @throws IOException In case of IO exception.
- */
- public void closeWriter() throws IOException {
- if (writer != null)
- writer.close(Reporter.NULL);
- }
-
- /**
- * Setup task.
- *
- * @throws IOException If failed.
- */
- public void setup() throws IOException {
- if (writer != null)
- jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
- }
-
- /**
- * Commit task.
- *
- * @throws IOException In failed.
- */
- public void commit() throws IOException {
- if (writer != null) {
- OutputCommitter outputCommitter = jobConf.getOutputCommitter();
-
- TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
-
- if (outputCommitter.needsTaskCommit(taskCtx))
- outputCommitter.commitTask(taskCtx);
- }
- }
-
- /**
- * Abort task.
- */
- public void abort() {
- try {
- if (writer != null)
- jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
- }
- catch (IOException ignore) {
- // No-op.
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
deleted file mode 100644
index 688ccef..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-/**
- * Hadoop partitioner adapter for v1 API.
- */
-public class GridHadoopV1Partitioner implements GridHadoopPartitioner {
- /** Partitioner instance. */
- private Partitioner<Object, Object> part;
-
- /**
- * @param cls Hadoop partitioner class.
- * @param conf Job configuration.
- */
- public GridHadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) {
- part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
- }
-
- /** {@inheritDoc} */
- @Override public int partition(Object key, Object val, int parts) {
- return part.getPartition(key, val, parts);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
deleted file mode 100644
index 7deea90..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-/**
- * Hadoop reduce task implementation for v1 API.
- */
-public class GridHadoopV1ReduceTask extends GridHadoopV1Task {
- /** {@code True} if reduce, {@code false} if combine. */
- private final boolean reduce;
-
- /**
- * Constructor.
- *
- * @param taskInfo Task info.
- * @param reduce {@code True} if reduce, {@code false} if combine.
- */
- public GridHadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) {
- super(taskInfo);
-
- this.reduce = reduce;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- GridHadoopJob job = taskCtx.job();
-
- GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
- JobConf jobConf = ctx.jobConf();
-
- GridHadoopTaskInput input = taskCtx.input();
-
- GridHadoopV1OutputCollector collector = null;
-
- try {
- collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
-
- Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(),
- jobConf);
-
- assert reducer != null;
-
- try {
- try {
- while (input.next()) {
- if (isCancelled())
- throw new GridHadoopTaskCancelledException("Reduce task cancelled.");
-
- reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
- }
- }
- finally {
- reducer.close();
- }
- }
- finally {
- collector.closeWriter();
- }
-
- collector.commit();
- }
- catch (Exception e) {
- if (collector != null)
- collector.abort();
-
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
deleted file mode 100644
index 1abb2c0..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-
-/**
- * Hadoop reporter implementation for v1 API.
- */
-public class GridHadoopV1Reporter implements Reporter {
- /** Context. */
- private final GridHadoopTaskContext ctx;
-
- /**
- * Creates new instance.
- *
- * @param ctx Context.
- */
- public GridHadoopV1Reporter(GridHadoopTaskContext ctx) {
- this.ctx = ctx;
- }
-
- /** {@inheritDoc} */
- @Override public void setStatus(String status) {
- // TODO
- }
-
- /** {@inheritDoc} */
- @Override public Counters.Counter getCounter(Enum<?> name) {
- return getCounter(name.getDeclaringClass().getName(), name.name());
- }
-
- /** {@inheritDoc} */
- @Override public Counters.Counter getCounter(String grp, String name) {
- return new GridHadoopV1Counter(ctx.counter(grp, name, GridHadoopLongCounter.class));
- }
-
- /** {@inheritDoc} */
- @Override public void incrCounter(Enum<?> key, long amount) {
- getCounter(key).increment(amount);
- }
-
- /** {@inheritDoc} */
- @Override public void incrCounter(String grp, String cntr, long amount) {
- getCounter(grp, cntr).increment(amount);
- }
-
- /** {@inheritDoc} */
- @Override public InputSplit getInputSplit() throws UnsupportedOperationException {
- throw new UnsupportedOperationException("reporter has no input"); // TODO
- }
-
- /** {@inheritDoc} */
- @Override public float getProgress() {
- return 0.5f; // TODO
- }
-
- /** {@inheritDoc} */
- @Override public void progress() {
- // TODO
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
deleted file mode 100644
index c7dc3fd..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-
-/**
- * Hadoop setup task implementation for v1 API.
- */
-public class GridHadoopV1SetupTask extends GridHadoopV1Task {
- /**
- * Constructor.
- *
- * @param taskInfo Task info.
- */
- public GridHadoopV1SetupTask(GridHadoopTaskInfo taskInfo) {
- super(taskInfo);
- }
-
- /** {@inheritDoc} */
- @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
- try {
- ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf());
-
- OutputCommitter committer = ctx.jobConf().getOutputCommitter();
-
- if (committer != null)
- committer.setupJob(ctx.jobContext());
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
deleted file mode 100644
index 257f4ea..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop API v1 splitter.
- */
-public class GridHadoopV1Splitter {
- /** */
- private static final String[] EMPTY_HOSTS = {};
-
- /**
- * @param jobConf Job configuration.
- * @return Collection of mapped splits.
- * @throws IgniteCheckedException If mapping failed.
- */
- public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException {
- try {
- InputFormat<?, ?> format = jobConf.getInputFormat();
-
- assert format != null;
-
- InputSplit[] splits = format.getSplits(jobConf, 0);
-
- Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length);
-
- for (int i = 0; i < splits.length; i++) {
- InputSplit nativeSplit = splits[i];
-
- if (nativeSplit instanceof FileSplit) {
- FileSplit s = (FileSplit)nativeSplit;
-
- res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
- }
- else
- res.add(GridHadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
- }
-
- return res;
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /**
- * @param clsName Input split class name.
- * @param in Input stream.
- * @param hosts Optional hosts.
- * @return File block or {@code null} if it is not a {@link FileSplit} instance.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
- @Nullable String[] hosts) throws IgniteCheckedException {
- if (!FileSplit.class.getName().equals(clsName))
- return null;
-
- FileSplit split = U.newInstance(FileSplit.class);
-
- try {
- split.readFields(in);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
-
- if (hosts == null)
- hosts = EMPTY_HOSTS;
-
- return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
deleted file mode 100644
index 86a7264..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.text.*;
-
-/**
- * Extended Hadoop v1 task.
- */
-public abstract class GridHadoopV1Task extends GridHadoopTask {
- /** Indicates that this task is to be cancelled. */
- private volatile boolean cancelled;
-
- /**
- * Constructor.
- *
- * @param taskInfo Task info.
- */
- protected GridHadoopV1Task(GridHadoopTaskInfo taskInfo) {
- super(taskInfo);
- }
-
- /**
- * Gets file name for that task result.
- *
- * @return File name.
- */
- public String fileName() {
- NumberFormat numFormat = NumberFormat.getInstance();
-
- numFormat.setMinimumIntegerDigits(5);
- numFormat.setGroupingUsed(false);
-
- return "part-" + numFormat.format(info().taskNumber());
- }
-
- /**
- *
- * @param jobConf Job configuration.
- * @param taskCtx Task context.
- * @param directWrite Direct write flag.
- * @param fileName File name.
- * @param attempt Attempt of task.
- * @return Collector.
- * @throws IOException In case of IO exception.
- */
- protected GridHadoopV1OutputCollector collector(JobConf jobConf, GridHadoopV2TaskContext taskCtx,
- boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException {
- GridHadoopV1OutputCollector collector = new GridHadoopV1OutputCollector(jobConf, taskCtx, directWrite,
- fileName, attempt) {
- /** {@inheritDoc} */
- @Override public void collect(Object key, Object val) throws IOException {
- if (cancelled)
- throw new GridHadoopTaskCancelledException("Task cancelled.");
-
- super.collect(key, val);
- }
- };
-
- collector.setup();
-
- return collector;
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- cancelled = true;
- }
-
- /** Returns true if task is cancelled. */
- public boolean isCancelled() {
- return cancelled;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
new file mode 100644
index 0000000..fa570ea
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+/**
+ * Hadoop cleanup task implementation for v1 API.
+ */
+public class HadoopV1CleanupTask extends HadoopV1Task {
+ /** Abort flag. */
+ private final boolean abort;
+
+ /**
+ * @param taskInfo Task info.
+ * @param abort Abort flag.
+ */
+ public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) {
+ super(taskInfo);
+
+ this.abort = abort;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobContext jobCtx = ctx.jobContext();
+
+ try {
+ OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter();
+
+ if (abort)
+ committer.abortJob(jobCtx, JobStatus.State.FAILED);
+ else
+ committer.commitJob(jobCtx);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
new file mode 100644
index 0000000..609297b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
+
+/**
+ * Hadoop counter implementation for v1 API.
+ */
+public class HadoopV1Counter extends Counters.Counter {
+ /** Delegate. */
+ private final HadoopLongCounter cntr;
+
+ /**
+ * Creates new instance.
+ *
+ * @param cntr Delegate counter.
+ */
+ public HadoopV1Counter(HadoopLongCounter cntr) {
+ this.cntr = cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDisplayName(String displayName) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return cntr.name();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDisplayName() {
+ return getName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getValue() {
+ return cntr.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setValue(long val) {
+ cntr.value(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void increment(long incr) {
+ cntr.increment(incr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public String makeEscapedCompactString() {
+ return toEscapedCompactString(new HadoopV2Counter(cntr));
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public boolean contentEquals(Counters.Counter cntr) {
+ return getUnderlyingCounter().equals(cntr.getUnderlyingCounter());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCounter() {
+ return cntr.value();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter getUnderlyingCounter() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
new file mode 100644
index 0000000..ad7b058
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+/**
+ * Hadoop map task implementation for v1 API.
+ */
+public class HadoopV1MapTask extends HadoopV1Task {
+ /** */
+ private static final String[] EMPTY_HOSTS = new String[0];
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo
+ */
+ public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopJob job = taskCtx.job();
+
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobConf jobConf = ctx.jobConf();
+
+ InputFormat inFormat = jobConf.getInputFormat();
+
+ HadoopInputSplit split = info().inputSplit();
+
+ InputSplit nativeSplit;
+
+ if (split instanceof HadoopFileBlock) {
+ HadoopFileBlock block = (HadoopFileBlock)split;
+
+ nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+ }
+ else
+ nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+
+ assert nativeSplit != null;
+
+ Reporter reporter = new HadoopV1Reporter(taskCtx);
+
+ HadoopV1OutputCollector collector = null;
+
+ try {
+ collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
+ fileName(), ctx.attemptId());
+
+ RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+
+ Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+
+ Object key = reader.createKey();
+ Object val = reader.createValue();
+
+ assert mapper != null;
+
+ try {
+ try {
+ while (reader.next(key, val)) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Map task cancelled.");
+
+ mapper.map(key, val, collector, reporter);
+ }
+ }
+ finally {
+ mapper.close();
+ }
+ }
+ finally {
+ collector.closeWriter();
+ }
+
+ collector.commit();
+ }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
new file mode 100644
index 0000000..348274d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Hadoop output collector.
+ */
+public class HadoopV1OutputCollector implements OutputCollector {
+ /** Job configuration. */
+ private final JobConf jobConf;
+
+ /** Task context. */
+ private final HadoopTaskContext taskCtx;
+
+ /** Optional direct writer. */
+ private final RecordWriter writer;
+
+ /** Task attempt. */
+ private final TaskAttemptID attempt;
+
+ /**
+ * @param jobConf Job configuration.
+ * @param taskCtx Task context.
+ * @param directWrite Direct write flag.
+ * @param fileName File name.
+ * @throws IOException In case of IO exception.
+ */
+ HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite,
+ @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+ this.jobConf = jobConf;
+ this.taskCtx = taskCtx;
+ this.attempt = attempt;
+
+ if (directWrite) {
+ jobConf.set("mapreduce.task.attempt.id", attempt.toString());
+
+ OutputFormat outFormat = jobConf.getOutputFormat();
+
+ writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
+ }
+ else
+ writer = null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void collect(Object key, Object val) throws IOException {
+ if (writer != null)
+ writer.write(key, val);
+ else {
+ try {
+ taskCtx.output().write(key, val);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * Close writer.
+ *
+ * @throws IOException In case of IO exception.
+ */
+ public void closeWriter() throws IOException {
+ if (writer != null)
+ writer.close(Reporter.NULL);
+ }
+
+ /**
+ * Setup task.
+ *
+ * @throws IOException If failed.
+ */
+ public void setup() throws IOException {
+ if (writer != null)
+ jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
+ }
+
+ /**
+ * Commit task.
+ *
+ * @throws IOException In failed.
+ */
+ public void commit() throws IOException {
+ if (writer != null) {
+ OutputCommitter outputCommitter = jobConf.getOutputCommitter();
+
+ TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
+
+ if (outputCommitter.needsTaskCommit(taskCtx))
+ outputCommitter.commitTask(taskCtx);
+ }
+ }
+
+ /**
+ * Abort task.
+ */
+ public void abort() {
+ try {
+ if (writer != null)
+ jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
+ }
+ catch (IOException ignore) {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
new file mode 100644
index 0000000..e45f92b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+/**
+ * Hadoop partitioner adapter for v1 API.
+ */
+public class HadoopV1Partitioner implements HadoopPartitioner {
+ /** Partitioner instance. */
+ private Partitioner<Object, Object> part;
+
+ /**
+ * @param cls Hadoop partitioner class.
+ * @param conf Job configuration.
+ */
+ public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) {
+ part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(Object key, Object val, int parts) {
+ return part.getPartition(key, val, parts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
new file mode 100644
index 0000000..18ee09d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+/**
+ * Hadoop reduce task implementation for v1 API.
+ */
+public class HadoopV1ReduceTask extends HadoopV1Task {
+ /** {@code True} if reduce, {@code false} if combine. */
+ private final boolean reduce;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ * @param reduce {@code True} if reduce, {@code false} if combine.
+ */
+ public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
+ super(taskInfo);
+
+ this.reduce = reduce;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopJob job = taskCtx.job();
+
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ JobConf jobConf = ctx.jobConf();
+
+ HadoopTaskInput input = taskCtx.input();
+
+ HadoopV1OutputCollector collector = null;
+
+ try {
+ collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+
+ Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(),
+ jobConf);
+
+ assert reducer != null;
+
+ try {
+ try {
+ while (input.next()) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+ reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ }
+ }
+ finally {
+ reducer.close();
+ }
+ }
+ finally {
+ collector.closeWriter();
+ }
+
+ collector.commit();
+ }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
new file mode 100644
index 0000000..d799373
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+
+/**
+ * Hadoop reporter implementation for v1 API.
+ */
+public class HadoopV1Reporter implements Reporter {
+ /** Context. */
+ private final HadoopTaskContext ctx;
+
+ /**
+ * Creates new instance.
+ *
+ * @param ctx Context.
+ */
+ public HadoopV1Reporter(HadoopTaskContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setStatus(String status) {
+ // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counters.Counter getCounter(Enum<?> name) {
+ return getCounter(name.getDeclaringClass().getName(), name.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counters.Counter getCounter(String grp, String name) {
+ return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrCounter(Enum<?> key, long amount) {
+ getCounter(key).increment(amount);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrCounter(String grp, String cntr, long amount) {
+ getCounter(grp, cntr).increment(amount);
+ }
+
+ /** {@inheritDoc} */
+ @Override public InputSplit getInputSplit() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("reporter has no input"); // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getProgress() {
+ return 0.5f; // TODO
+ }
+
+ /** {@inheritDoc} */
+ @Override public void progress() {
+ // TODO
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
new file mode 100644
index 0000000..a758f1d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+/**
+ * Hadoop setup task implementation for v1 API.
+ */
+public class HadoopV1SetupTask extends HadoopV1Task {
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ */
+ public HadoopV1SetupTask(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+ try {
+ ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf());
+
+ OutputCommitter committer = ctx.jobConf().getOutputCommitter();
+
+ if (committer != null)
+ committer.setupJob(ctx.jobContext());
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
new file mode 100644
index 0000000..9eebbb8
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop API v1 splitter.
+ */
+public class HadoopV1Splitter {
+ /** */
+ private static final String[] EMPTY_HOSTS = {};
+
+ /**
+ * @param jobConf Job configuration.
+ * @return Collection of mapped splits.
+ * @throws IgniteCheckedException If mapping failed.
+ */
+ public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException {
+ try {
+ InputFormat<?, ?> format = jobConf.getInputFormat();
+
+ assert format != null;
+
+ InputSplit[] splits = format.getSplits(jobConf, 0);
+
+ Collection<HadoopInputSplit> res = new ArrayList<>(splits.length);
+
+ for (int i = 0; i < splits.length; i++) {
+ InputSplit nativeSplit = splits[i];
+
+ if (nativeSplit instanceof FileSplit) {
+ FileSplit s = (FileSplit)nativeSplit;
+
+ res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
+ }
+ else
+ res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
+ }
+
+ return res;
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * @param clsName Input split class name.
+ * @param in Input stream.
+ * @param hosts Optional hosts.
+ * @return File block or {@code null} if it is not a {@link FileSplit} instance.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
+ @Nullable String[] hosts) throws IgniteCheckedException {
+ if (!FileSplit.class.getName().equals(clsName))
+ return null;
+
+ FileSplit split = U.newInstance(FileSplit.class);
+
+ try {
+ split.readFields(in);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ if (hosts == null)
+ hosts = EMPTY_HOSTS;
+
+ return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
new file mode 100644
index 0000000..b7da700
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.text.*;
+
+/**
+ * Extended Hadoop v1 task.
+ */
+public abstract class HadoopV1Task extends HadoopTask {
+ /** Indicates that this task is to be cancelled. */
+ private volatile boolean cancelled;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ */
+ protected HadoopV1Task(HadoopTaskInfo taskInfo) {
+ super(taskInfo);
+ }
+
+ /**
+ * Gets file name for that task result.
+ *
+ * @return File name.
+ */
+ public String fileName() {
+ NumberFormat numFormat = NumberFormat.getInstance();
+
+ numFormat.setMinimumIntegerDigits(5);
+ numFormat.setGroupingUsed(false);
+
+ return "part-" + numFormat.format(info().taskNumber());
+ }
+
+ /**
+ *
+ * @param jobConf Job configuration.
+ * @param taskCtx Task context.
+ * @param directWrite Direct write flag.
+ * @param fileName File name.
+ * @param attempt Attempt of task.
+ * @return Collector.
+ * @throws IOException In case of IO exception.
+ */
+ protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx,
+ boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+ HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite,
+ fileName, attempt) {
+ /** {@inheritDoc} */
+ @Override public void collect(Object key, Object val) throws IOException {
+ if (cancelled)
+ throw new HadoopTaskCancelledException("Task cancelled.");
+
+ super.collect(key, val);
+ }
+ };
+
+ collector.setup();
+
+ return collector;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ cancelled = true;
+ }
+
+ /** Returns true if task is cancelled. */
+ public boolean isCancelled() {
+ return cancelled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
deleted file mode 100644
index 36b40a2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.io.*;
-
-/**
- * Split serialized in external file.
- */
-public class GridHadoopExternalSplit extends GridHadoopInputSplit {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long off;
-
- /**
- * For {@link Externalizable}.
- */
- public GridHadoopExternalSplit() {
- // No-op.
- }
-
- /**
- * @param hosts Hosts.
- * @param off Offset of this split in external file.
- */
- public GridHadoopExternalSplit(String[] hosts, long off) {
- assert off >= 0 : off;
- assert hosts != null;
-
- this.hosts = hosts;
- this.off = off;
- }
-
- /**
- * @return Offset of this input split in external file.
- */
- public long offset() {
- return off;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(off);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- off = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- GridHadoopExternalSplit that = (GridHadoopExternalSplit) o;
-
- return off == that.off;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return (int)(off ^ (off >>> 32));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
deleted file mode 100644
index 5ef4759..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.classification.*;
-import org.apache.hadoop.conf.*;
-
-/**
- * A fake helper to load the native hadoop code i.e. libhadoop.so.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class GridHadoopNativeCodeLoader {
- /**
- * Check if native-hadoop code is loaded for this platform.
- *
- * @return <code>true</code> if native-hadoop is loaded,
- * else <code>false</code>
- */
- public static boolean isNativeCodeLoaded() {
- return false;
- }
-
- /**
- * Returns true only if this build was compiled with support for snappy.
- */
- public static boolean buildSupportsSnappy() {
- return false;
- }
-
- /**
- * @return Library name.
- */
- public static String getLibraryName() {
- throw new IllegalStateException();
- }
-
- /**
- * Return if native hadoop libraries, if present, can be used for this job.
- * @param conf configuration
- *
- * @return <code>true</code> if native hadoop libraries, if present, can be
- * used for this job; <code>false</code> otherwise.
- */
- public boolean getLoadNativeLibraries(Configuration conf) {
- return false;
- }
-
- /**
- * Set if native hadoop libraries, if present, can be used for this job.
- *
- * @param conf configuration
- * @param loadNativeLibraries can native hadoop libraries be loaded
- */
- public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) {
- // No-op.
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
deleted file mode 100644
index 0f38548..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.io.serializer.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * The wrapper around external serializer.
- */
-public class GridHadoopSerializationWrapper<T> implements GridHadoopSerialization {
- /** External serializer - writer. */
- private final Serializer<T> serializer;
-
- /** External serializer - reader. */
- private final Deserializer<T> deserializer;
-
- /** Data output for current write operation. */
- private OutputStream currOut;
-
- /** Data input for current read operation. */
- private InputStream currIn;
-
- /** Wrapper around current output to provide OutputStream interface. */
- private final OutputStream outStream = new OutputStream() {
- /** {@inheritDoc} */
- @Override public void write(int b) throws IOException {
- currOut.write(b);
- }
-
- /** {@inheritDoc} */
- @Override public void write(byte[] b, int off, int len) throws IOException {
- currOut.write(b, off, len);
- }
- };
-
- /** Wrapper around current input to provide InputStream interface. */
- private final InputStream inStream = new InputStream() {
- /** {@inheritDoc} */
- @Override public int read() throws IOException {
- return currIn.read();
- }
-
- /** {@inheritDoc} */
- @Override public int read(byte[] b, int off, int len) throws IOException {
- return currIn.read(b, off, len);
- }
- };
-
- /**
- * @param serialization External serializer to wrap.
- * @param cls The class to serialize.
- */
- public GridHadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException {
- assert cls != null;
-
- serializer = serialization.getSerializer(cls);
- deserializer = serialization.getDeserializer(cls);
-
- try {
- serializer.open(outStream);
- deserializer.open(inStream);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
- assert out != null;
- assert obj != null;
-
- try {
- currOut = (OutputStream)out;
-
- serializer.serialize((T)obj);
-
- currOut = null;
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
- assert in != null;
-
- try {
- currIn = (InputStream)in;
-
- T res = deserializer.deserialize((T) obj);
-
- currIn = null;
-
- return res;
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- try {
- serializer.close();
- deserializer.close();
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
- }
-}