You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/02/02 22:57:32 UTC
[49/50] [abbrv] incubator-ignite git commit: ignite-132 - pkg rename
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
index 26b1628..97546bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
-import org.apache.ignite.hadoop.*;
import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index bfb199c..5c07dc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.portable.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.lang.*;
@@ -36,7 +37,6 @@ import org.apache.ignite.internal.product.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.authentication.*;
import org.apache.ignite.spi.authentication.noop.*;
-import org.apache.ignite.hadoop.*;
import org.apache.ignite.internal.managers.*;
import org.apache.ignite.internal.managers.checkpoint.*;
import org.apache.ignite.internal.managers.collision.*;
@@ -468,8 +468,9 @@ public class IgniteKernal extends ClusterGroupAdapter implements IgniteEx, Ignit
@Override public Collection<String> getUserAttributesFormatted() {
assert cfg != null;
- return F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String, ?>, String>() {
- @Override public String apply(Map.Entry<String, ?> e) {
+ return F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String,?>,String>() {
+ @Override
+ public String apply(Map.Entry<String,?> e) {
return e.getKey() + ", " + e.getValue().toString();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoop.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoop.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoop.java
new file mode 100644
index 0000000..246c4ed
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoop.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Hadoop facade providing access to GridGain Hadoop features.
+ */
+public interface GridHadoop {
+ /**
+ * Gets Hadoop module configuration.
+ *
+ * @return Hadoop module configuration.
+ */
+ public GridHadoopConfiguration configuration();
+
+ /**
+ * Generate next job ID.
+ *
+ * @return Next job ID.
+ */
+ public GridHadoopJobId nextJobId();
+
+ /**
+ * Submits job to job tracker.
+ *
+ * @param jobId Job ID to submit.
+ * @param jobInfo Job info to submit.
+ * @return Execution future.
+ */
+ public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo);
+
+ /**
+ * Gets Hadoop job execution status.
+ *
+ * @param jobId Job ID to get status for.
+ * @return Job execution status or {@code null} in case job with the given ID is not found.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException;
+
+ /**
+ * Returns job counters.
+ *
+ * @param jobId Job ID to get counters for.
+ * @return Job counters object.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException;
+
+ /**
+ * Gets Hadoop finish future for particular job.
+ *
+ * @param jobId Job ID.
+ * @return Job finish future or {@code null} in case job with the given ID is not found.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException;
+
+ /**
+ * Kills job.
+ *
+ * @param jobId Job ID.
+ * @return {@code True} if job was killed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopConfiguration.java
new file mode 100644
index 0000000..f66b95a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopConfiguration.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * Hadoop configuration.
+ */
+public class GridHadoopConfiguration {
+ /** Default finished job info time-to-live. */
+ public static final long DFLT_FINISHED_JOB_INFO_TTL = 10_000;
+
+ /** Default value for external execution flag. */
+ public static final boolean DFLT_EXTERNAL_EXECUTION = false;
+
+ /** Default value for the max parallel tasks. */
+ public static final int DFLT_MAX_PARALLEL_TASKS = Runtime.getRuntime().availableProcessors();
+
+ /** Default value for the max task queue size. */
+ public static final int DFLT_MAX_TASK_QUEUE_SIZE = 1000;
+
+ /** Map reduce planner. */
+ private GridHadoopMapReducePlanner planner;
+
+ /** */
+ private boolean extExecution = DFLT_EXTERNAL_EXECUTION;
+
+ /** Finished job info TTL. */
+ private long finishedJobInfoTtl = DFLT_FINISHED_JOB_INFO_TTL;
+
+ /** */
+ private int maxParallelTasks = DFLT_MAX_PARALLEL_TASKS;
+
+ /** */
+ private int maxTaskQueueSize = DFLT_MAX_TASK_QUEUE_SIZE;
+
+ /**
+ * Default constructor.
+ */
+ public GridHadoopConfiguration() {
+ // No-op.
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param cfg Configuration to copy.
+ */
+ public GridHadoopConfiguration(GridHadoopConfiguration cfg) {
+ // Preserve alphabetic order.
+ extExecution = cfg.isExternalExecution();
+ finishedJobInfoTtl = cfg.getFinishedJobInfoTtl();
+ planner = cfg.getMapReducePlanner();
+ maxParallelTasks = cfg.getMaxParallelTasks();
+ maxTaskQueueSize = cfg.getMaxTaskQueueSize();
+ }
+
+ /**
+ * Gets max number of local tasks that may be executed in parallel.
+ *
+ * @return Max number of local tasks that may be executed in parallel.
+ */
+ public int getMaxParallelTasks() {
+ return maxParallelTasks;
+ }
+
+ /**
+ * Sets max number of local tasks that may be executed in parallel.
+ *
+ * @param maxParallelTasks Max number of local tasks that may be executed in parallel.
+ */
+ public void setMaxParallelTasks(int maxParallelTasks) {
+ this.maxParallelTasks = maxParallelTasks;
+ }
+
+ /**
+ * Gets max task queue size.
+ *
+ * @return Max task queue size.
+ */
+ public int getMaxTaskQueueSize() {
+ return maxTaskQueueSize;
+ }
+
+ /**
+ * Sets max task queue size.
+ *
+ * @param maxTaskQueueSize Max task queue size.
+ */
+ public void setMaxTaskQueueSize(int maxTaskQueueSize) {
+ this.maxTaskQueueSize = maxTaskQueueSize;
+ }
+
+ /**
+ * Gets finished job info time-to-live in milliseconds.
+ *
+ * @return Finished job info time-to-live.
+ */
+ public long getFinishedJobInfoTtl() {
+ return finishedJobInfoTtl;
+ }
+
+ /**
+ * Sets finished job info time-to-live.
+ *
+ * @param finishedJobInfoTtl Finished job info time-to-live.
+ */
+ public void setFinishedJobInfoTtl(long finishedJobInfoTtl) {
+ this.finishedJobInfoTtl = finishedJobInfoTtl;
+ }
+
+ /**
+ * Gets external task execution flag. If {@code true}, hadoop job tasks will be executed in an external
+ * (relative to node) process.
+ *
+ * @return {@code True} if external execution.
+ */
+ public boolean isExternalExecution() {
+ return extExecution;
+ }
+
+ /**
+ * Sets external task execution flag.
+ *
+ * @param extExecution {@code True} if tasks should be executed in an external process.
+ * @see #isExternalExecution()
+ */
+ public void setExternalExecution(boolean extExecution) {
+ this.extExecution = extExecution;
+ }
+
+ /**
+ * Gets Hadoop map-reduce planner, a component which defines job execution plan based on job
+ * configuration and current grid topology.
+ *
+ * @return Map-reduce planner.
+ */
+ public GridHadoopMapReducePlanner getMapReducePlanner() {
+ return planner;
+ }
+
+ /**
+ * Sets Hadoop map-reduce planner, a component which defines job execution plan based on job
+ * configuration and current grid topology.
+ *
+ * @param planner Map-reduce planner.
+ */
+ public void setMapReducePlanner(GridHadoopMapReducePlanner planner) {
+ this.planner = planner;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridHadoopConfiguration.class, this, super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounter.java
new file mode 100644
index 0000000..83902dd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Hadoop counter.
+ */
+public interface GridHadoopCounter {
+ /**
+ * Gets name.
+ *
+ * @return Name of the counter.
+ */
+ public String name();
+
+ /**
+ * Gets counter group.
+ *
+ * @return Counter group's name.
+ */
+ public String group();
+
+ /**
+ * Merge the given counter to this counter.
+ *
+ * @param cntr Counter to merge into this counter.
+ */
+ public void merge(GridHadoopCounter cntr);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounterWriter.java
new file mode 100644
index 0000000..af72e69
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounterWriter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * The object that writes some system counters to some storage for each running job. This operation is a part of
+ * whole statistics collection process.
+ */
+public interface GridHadoopCounterWriter {
+ /**
+ * Writes counters of given job to some statistics storage.
+ *
+ * @param jobInfo Job info.
+ * @param jobId Job id.
+ * @param cntrs Counters.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java
new file mode 100644
index 0000000..91eb8a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.*;
+
+/**
+ * Counters store.
+ */
+public interface GridHadoopCounters {
+ /**
+ * Returns counter for the specified group and counter name. Creates new if it does not exist.
+ *
+ * @param grp Counter group name.
+ * @param name Counter name.
+ * @param cls Class for new instance creation if it's needed.
+ * @return The counter that was found or added or {@code null} if create is false.
+ */
+ <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls);
+
+ /**
+ * Returns all existing counters.
+ *
+ * @return Collection of counters.
+ */
+ Collection<GridHadoopCounter> all();
+
+ /**
+ * Merges all counters from another store with existing counters.
+ *
+ * @param other Counters to merge with.
+ */
+ void merge(GridHadoopCounters other);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileBlock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileBlock.java
new file mode 100644
index 0000000..fae111a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileBlock.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Hadoop file block.
+ */
+public class GridHadoopFileBlock extends GridHadoopInputSplit {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ protected URI file;
+
+ /** */
+ @GridToStringInclude
+ protected long start;
+
+ /** */
+ @GridToStringInclude
+ protected long len;
+
+ /**
+ * Creates new file block.
+ */
+ public GridHadoopFileBlock() {
+ // No-op.
+ }
+
+ /**
+ * Creates new file block.
+ *
+ * @param hosts List of hosts where the block resides.
+ * @param file File URI.
+ * @param start Start position of the block in the file.
+ * @param len Length of the block.
+ */
+ public GridHadoopFileBlock(String[] hosts, URI file, long start, long len) {
+ A.notNull(hosts, "hosts", file, "file");
+
+ this.hosts = hosts;
+ this.file = file;
+ this.start = start;
+ this.len = len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(file());
+ out.writeLong(start());
+ out.writeLong(length());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ file = (URI)in.readObject();
+ start = in.readLong();
+ len = in.readLong();
+ }
+
+ /**
+ * @return Length.
+ */
+ public long length() {
+ return len;
+ }
+
+ /**
+ * @param len New length.
+ */
+ public void length(long len) {
+ this.len = len;
+ }
+
+ /**
+ * @return Start.
+ */
+ public long start() {
+ return start;
+ }
+
+ /**
+ * @param start New start.
+ */
+ public void start(long start) {
+ this.start = start;
+ }
+
+ /**
+ * @return File.
+ */
+ public URI file() {
+ return file;
+ }
+
+ /**
+ * @param file New file.
+ */
+ public void file(URI file) {
+ this.file = file;
+ }
+
+ /**
+ * @param hosts New hosts.
+ */
+ public void hosts(String[] hosts) {
+ A.notNull(hosts, "hosts");
+
+ this.hosts = hosts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof GridHadoopFileBlock))
+ return false;
+
+ GridHadoopFileBlock that = (GridHadoopFileBlock)o;
+
+ return len == that.len && start == that.start && file.equals(that.file);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = file.hashCode();
+
+ res = 31 * res + (int)(start ^ (start >>> 32));
+ res = 31 * res + (int)(len ^ (len >>> 32));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(GridHadoopFileBlock.class, this, "hosts", Arrays.toString(hosts));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopInputSplit.java
new file mode 100644
index 0000000..e68a6f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopInputSplit.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.io.*;
+
+/**
+ * Abstract fragment of an input data source.
+ */
+public abstract class GridHadoopInputSplit implements Externalizable {
+ /** */
+ protected String[] hosts;
+
+ /**
+ * Array of hosts where this input split resides.
+ *
+ * @return Hosts.
+ */
+ public String[] hosts() {
+ assert hosts != null;
+
+ return hosts;
+ }
+
+ /**
+ * This method must be implemented for purpose of internal implementation.
+ *
+ * @param obj Another object.
+ * @return {@code true} If objects are equal.
+ */
+ @Override public abstract boolean equals(Object obj);
+
+ /**
+ * This method must be implemented for purpose of internal implementation.
+ *
+ * @return Hash code of the object.
+ */
+ @Override public abstract int hashCode();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJob.java
new file mode 100644
index 0000000..a1ec9a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJob.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Hadoop job.
+ */
+public interface GridHadoopJob {
+ /**
+ * Gets job ID.
+ *
+ * @return Job ID.
+ */
+ public GridHadoopJobId id();
+
+ /**
+ * Gets job information.
+ *
+ * @return Job information.
+ */
+ public GridHadoopJobInfo info();
+
+ /**
+ * Gets collection of input splits for this job.
+ *
+ * @return Input splits.
+ */
+ public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException;
+
+ /**
+ * Returns context for task execution.
+ *
+ * @param info Task info.
+ * @return Task Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException;
+
+ /**
+ * Does all the needed initialization for the job. Will be called on each node where tasks for this job must
+ * be executed.
+ * <p>
+ * If job is running in external mode this method will be called on instance in GridGain node with parameter
+ * {@code false} and on instance in external process with parameter {@code true}.
+ *
+ * @param external If {@code true} then this job instance resides in external process.
+ * @param locNodeId Local node ID.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException;
+
+ /**
+ * Release all the resources.
+ * <p>
+ * If job is running in external mode this method will be called on instance in GridGain node with parameter
+ * {@code false} and on instance in external process with parameter {@code true}.
+ *
+ * @param external If {@code true} then this job instance resides in external process.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void dispose(boolean external) throws IgniteCheckedException;
+
+ /**
+ * Prepare local environment for the task.
+ *
+ * @param info Task info.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException;
+
+ /**
+ * Cleans up local environment of the task.
+ *
+ * @param info Task info.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException;
+
+ /**
+ * Cleans up the job staging directory.
+ */
+ void cleanupStagingDirectory();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java
new file mode 100644
index 0000000..5d0a9c4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Job ID.
+ */
+public class GridHadoopJobId implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private int jobId;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public GridHadoopJobId() {
+ // No-op.
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param jobId Job ID.
+ */
+ public GridHadoopJobId(UUID nodeId, int jobId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ }
+
+ public UUID globalId() {
+ return nodeId;
+ }
+
+ public int localId() {
+ return jobId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeUuid(out, nodeId);
+ out.writeInt(jobId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ nodeId = U.readUuid(in);
+ jobId = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ GridHadoopJobId that = (GridHadoopJobId) o;
+
+ if (jobId != that.jobId)
+ return false;
+
+ if (!nodeId.equals(that.nodeId))
+ return false;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * nodeId.hashCode() + jobId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return nodeId + "_" + jobId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java
new file mode 100644
index 0000000..9a891f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Compact job description.
+ */
+public interface GridHadoopJobInfo extends Serializable {
+ /**
+ * Gets optional configuration property for the job.
+ *
+ * @param name Property name.
+ * @return Value or {@code null} if none.
+ */
+ @Nullable public String property(String name);
+
+ /**
+ * Checks whether job has combiner.
+ *
+ * @return {@code true} If job has combiner.
+ */
+ public boolean hasCombiner();
+
+ /**
+ * Checks whether job has reducer.
+ * Actual number of reducers will be in {@link GridHadoopMapReducePlan#reducers()}.
+ *
+ * @return Number of reducer.
+ */
+ public boolean hasReducer();
+
+ /**
+ * Creates new job instance for the given ID.
+ * {@link GridHadoopJobInfo} is reusable for multiple jobs while {@link GridHadoopJob} is for one job execution.
+ * This method will be called once for the same ID on one node, though it can be called on the same host
+ * multiple times from different processes (in case of multiple nodes on the same host or external execution).
+ *
+ * @param jobId Job ID.
+ * @param log Logger.
+ * @return Job.
+ * @throws IgniteCheckedException If failed.
+ */
+ GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException;
+
+ /**
+ * @return Number of reducers configured for job.
+ */
+ public int reducers();
+
+ /**
+ * Gets job name.
+ *
+ * @return Job name.
+ */
+ public String jobName();
+
+ /**
+ * Gets user name.
+ *
+ * @return User name.
+ */
+ public String user();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobPhase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobPhase.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobPhase.java
new file mode 100644
index 0000000..cc122bb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobPhase.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Job run phase.
+ */
+public enum GridHadoopJobPhase {
+ /** Job is running setup task. */
+ PHASE_SETUP,
+
+ /** Job is running map and combine tasks. */
+ PHASE_MAP,
+
+ /** Job has finished all map tasks and running reduce tasks. */
+ PHASE_REDUCE,
+
+ /** Job is stopping due to exception during any of the phases. */
+ PHASE_CANCELLING,
+
+ /** Job has finished execution. */
+ PHASE_COMPLETE
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobProperty.java
new file mode 100644
index 0000000..f324645
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobProperty.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Enumeration of optional properties supported by GridGain for Apache Hadoop.
+ */
+public enum GridHadoopJobProperty {
+ /**
+ * Initial size for hashmap which stores output of mapper and will be used as input of combiner.
+ * <p>
+ * Setting it right allows to avoid rehashing.
+ */
+ COMBINER_HASHMAP_SIZE,
+
+ /**
+ * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer.
+ * <p>
+ * Setting it right allows to avoid rehashing.
+ */
+ PARTITION_HASHMAP_SIZE,
+
+ /**
+ * Specifies number of concurrently running mappers for external execution mode.
+ * <p>
+ * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}.
+ */
+ EXTERNAL_CONCURRENT_MAPPERS,
+
+ /**
+ * Specifies number of concurrently running reducers for external execution mode.
+ * <p>
+ * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}.
+ */
+ EXTERNAL_CONCURRENT_REDUCERS,
+
+ /**
+ * Delay in milliseconds after which GridGain server will reply job status.
+ */
+ JOB_STATUS_POLL_DELAY,
+
+ /**
+ * Size in bytes of single memory page which will be allocated for data structures in shuffle.
+ * <p>
+ * By default is {@code 32 * 1024}.
+ */
+ SHUFFLE_OFFHEAP_PAGE_SIZE,
+
+ /**
+ * If set to {@code true} then input for combiner will not be sorted by key.
+ * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)}
+ * and {@link Object#hashCode()} methods of key must be implemented consistently with
+ * comparator for that type. Grouping comparator is not supported if this setting is {@code true}.
+ * <p>
+ * By default is {@code false}.
+ */
+ SHUFFLE_COMBINER_NO_SORTING,
+
+ /**
+ * If set to {@code true} then input for reducer will not be sorted by key.
+ * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)}
+ * and {@link Object#hashCode()} methods of key must be implemented consistently with
+ * comparator for that type. Grouping comparator is not supported if this setting is {@code true}.
+ * <p>
+ * By default is {@code false}.
+ */
+ SHUFFLE_REDUCER_NO_SORTING;
+
+ /** */
+ private final String ptyName;
+
+ /**
+ *
+ */
+ GridHadoopJobProperty() {
+ ptyName = "gridgain." + name().toLowerCase().replace('_', '.');
+ }
+
+ /**
+ * @return Property name.
+ */
+ public String propertyName() {
+ return ptyName;
+ }
+
+ /**
+ * @param jobInfo Job info.
+ * @param pty Property.
+ * @param dflt Default value.
+ * @return Property value.
+ */
+ public static String get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, @Nullable String dflt) {
+ String res = jobInfo.property(pty.propertyName());
+
+ return res == null ? dflt : res;
+ }
+
+ /**
+ * @param jobInfo Job info.
+ * @param pty Property.
+ * @param dflt Default value.
+ * @return Property value.
+ */
+ public static int get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, int dflt) {
+ String res = jobInfo.property(pty.propertyName());
+
+ return res == null ? dflt : Integer.parseInt(res);
+ }
+
+ /**
+ * @param jobInfo Job info.
+ * @param pty Property.
+ * @param dflt Default value.
+ * @return Property value.
+ */
+ public static boolean get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, boolean dflt) {
+ String res = jobInfo.property(pty.propertyName());
+
+ return res == null ? dflt : Boolean.parseBoolean(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobStatus.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobStatus.java
new file mode 100644
index 0000000..02ea883
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobStatus.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Hadoop job status.
+ */
+public class GridHadoopJobStatus implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ private GridHadoopJobId jobId;
+
+ /** Job name. */
+ private String jobName;
+
+ /** User. */
+ private String usr;
+
+ /** Pending mappers count. */
+ private int pendingMapperCnt;
+
+ /** Pending reducers count. */
+ private int pendingReducerCnt;
+
+ /** Total mappers count. */
+ private int totalMapperCnt;
+
+ /** Total reducers count. */
+ private int totalReducerCnt;
+ /** Phase. */
+ private GridHadoopJobPhase jobPhase;
+
+ /** */
+ private boolean failed;
+
+ /** Version. */
+ private long ver;
+
+ /**
+ * {@link Externalizable} support.
+ */
+ public GridHadoopJobStatus() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param jobId Job ID.
+ * @param jobName Job name.
+ * @param usr User.
+ * @param pendingMapperCnt Pending mappers count.
+ * @param pendingReducerCnt Pending reducers count.
+ * @param totalMapperCnt Total mappers count.
+ * @param totalReducerCnt Total reducers count.
+ * @param jobPhase Job phase.
+ * @param failed Failed.
+ * @param ver Version.
+ */
+ public GridHadoopJobStatus(
+ GridHadoopJobId jobId,
+ String jobName,
+ String usr,
+ int pendingMapperCnt,
+ int pendingReducerCnt,
+ int totalMapperCnt,
+ int totalReducerCnt,
+ GridHadoopJobPhase jobPhase,
+ boolean failed,
+ long ver
+ ) {
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.usr = usr;
+ this.pendingMapperCnt = pendingMapperCnt;
+ this.pendingReducerCnt = pendingReducerCnt;
+ this.totalMapperCnt = totalMapperCnt;
+ this.totalReducerCnt = totalReducerCnt;
+ this.jobPhase = jobPhase;
+ this.failed = failed;
+ this.ver = ver;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public GridHadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Job name.
+ */
+ public String jobName() {
+ return jobName;
+ }
+
+ /**
+ * @return User.
+ */
+ public String user() {
+ return usr;
+ }
+
+ /**
+ * @return Pending mappers count.
+ */
+ public int pendingMapperCnt() {
+ return pendingMapperCnt;
+ }
+
+ /**
+ * @return Pending reducers count.
+ */
+ public int pendingReducerCnt() {
+ return pendingReducerCnt;
+ }
+
+ /**
+ * @return Total mappers count.
+ */
+ public int totalMapperCnt() {
+ return totalMapperCnt;
+ }
+
+ /**
+ * @return Total reducers count.
+ */
+ public int totalReducerCnt() {
+ return totalReducerCnt;
+ }
+
+ /**
+ * @return Version.
+ */
+ public long version() {
+ return ver;
+ }
+
+ /**
+ * @return Job phase.
+ */
+ public GridHadoopJobPhase jobPhase() {
+ return jobPhase;
+ }
+
+ /**
+ * @return {@code true} If the job failed.
+ */
+ public boolean isFailed() {
+ return failed;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridHadoopJobStatus.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(jobId);
+ U.writeString(out, jobName);
+ U.writeString(out, usr);
+ out.writeInt(pendingMapperCnt);
+ out.writeInt(pendingReducerCnt);
+ out.writeInt(totalMapperCnt);
+ out.writeInt(totalReducerCnt);
+ out.writeObject(jobPhase);
+ out.writeBoolean(failed);
+ out.writeLong(ver);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = (GridHadoopJobId)in.readObject();
+ jobName = U.readString(in);
+ usr = U.readString(in);
+ pendingMapperCnt = in.readInt();
+ pendingReducerCnt = in.readInt();
+ totalMapperCnt = in.readInt();
+ totalReducerCnt = in.readInt();
+ jobPhase = (GridHadoopJobPhase)in.readObject();
+ failed = in.readBoolean();
+ ver = in.readLong();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java
new file mode 100644
index 0000000..2fd5160
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Map-reduce job execution plan.
+ */
+public interface GridHadoopMapReducePlan extends Serializable {
+ /**
+ * Gets collection of file blocks for which mappers should be executed.
+ *
+ * @param nodeId Node ID to check.
+ * @return Collection of file blocks or {@code null} if no mappers should be executed on given node.
+ */
+ @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId);
+
+ /**
+ * Gets reducer IDs that should be started on given node.
+ *
+ * @param nodeId Node ID to check.
+ * @return Array of reducer IDs.
+ */
+ @Nullable public int[] reducers(UUID nodeId);
+
+ /**
+ * Gets collection of all node IDs involved in map part of job execution.
+ *
+ * @return Collection of node IDs.
+ */
+ public Collection<UUID> mapperNodeIds();
+
+ /**
+ * Gets collection of all node IDs involved in reduce part of job execution.
+ *
+ * @return Collection of node IDs.
+ */
+ public Collection<UUID> reducerNodeIds();
+
+ /**
+ * Gets overall number of mappers for the job.
+ *
+ * @return Number of mappers.
+ */
+ public int mappers();
+
+ /**
+ * Gets overall number of reducers for the job.
+ *
+ * @return Number of reducers.
+ */
+ public int reducers();
+
+ /**
+ * Gets node ID for reducer.
+ *
+ * @param reducer Reducer.
+ * @return Node ID.
+ */
+ public UUID nodeForReducer(int reducer);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java
new file mode 100644
index 0000000..56c6913
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Map-reduce execution planner.
+ */
+public interface GridHadoopMapReducePlanner {
+ /**
+ * Prepares map-reduce execution plan for the given job and topology.
+ *
+ * @param job Job.
+ * @param top Topology.
+ * @param oldPlan Old plan in case of partial failure.
+ * @return Map reduce plan.
+ */
+ public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top,
+ @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java
new file mode 100644
index 0000000..fcde424
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Partitioner.
+ */
+public interface GridHadoopPartitioner {
+ /**
+ * Gets partition which is actually a reducer index for the given key and value pair.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @param parts Number of partitions.
+ * @return Partition.
+ */
+ public int partition(Object key, Object val, int parts);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java
new file mode 100644
index 0000000..5bc8806
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Hadoop serialization. Not thread safe object, must be created for each thread or correctly synchronized.
+ */
+public interface GridHadoopSerialization extends AutoCloseable {
+ /**
+ * Writes the given object to output.
+ *
+ * @param out Output.
+ * @param obj Object to serialize.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void write(DataOutput out, Object obj) throws IgniteCheckedException;
+
+ /**
+ * Reads object from the given input optionally reusing given instance.
+ *
+ * @param in Input.
+ * @param obj Object.
+ * @return New object or reused instance.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException;
+
+ /**
+ * Finalise the internal objects.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ @Override public void close() throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java
new file mode 100644
index 0000000..2c00811
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+import java.io.*;
+
+/**
+ * Hadoop task.
+ */
+public abstract class GridHadoopTask {
+ /** */
+ private GridHadoopTaskInfo taskInfo;
+
+ /**
+ * Creates task.
+ *
+ * @param taskInfo Task info.
+ */
+ protected GridHadoopTask(GridHadoopTaskInfo taskInfo) {
+ assert taskInfo != null;
+
+ this.taskInfo = taskInfo;
+ }
+
+ /**
+ * For {@link Externalizable}.
+ */
+ @SuppressWarnings("ConstructorNotProtectedInAbstractClass")
+ public GridHadoopTask() {
+ // No-op.
+ }
+
+ /**
+ * Gets task info.
+ *
+ * @return Task info.
+ */
+ public GridHadoopTaskInfo info() {
+ return taskInfo;
+ }
+
+ /**
+ * Runs task.
+ *
+ * @param taskCtx Context.
+ * @throws org.apache.ignite.IgniteInterruptedException If interrupted.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException;
+
+ /**
+ * Interrupts task execution.
+ */
+ public abstract void cancel();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java
new file mode 100644
index 0000000..bedd93b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Task context.
+ */
+public abstract class GridHadoopTaskContext {
+ /** */
+ private final GridHadoopJob job;
+
+ /** */
+ private GridHadoopTaskInput input;
+
+ /** */
+ private GridHadoopTaskOutput output;
+
+ /** */
+ private GridHadoopTaskInfo taskInfo;
+
+ /**
+ * @param taskInfo Task info.
+ * @param job Job.
+ */
+ protected GridHadoopTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob job) {
+ this.taskInfo = taskInfo;
+ this.job = job;
+ }
+
+ /**
+ * Gets task info.
+ *
+ * @return Task info.
+ */
+ public GridHadoopTaskInfo taskInfo() {
+ return taskInfo;
+ }
+
+ /**
+ * Set a new task info.
+ *
+ * @param info Task info.
+ */
+ public void taskInfo(GridHadoopTaskInfo info) {
+ taskInfo = info;
+ }
+
+ /**
+ * Gets task output.
+ *
+ * @return Task output.
+ */
+ public GridHadoopTaskOutput output() {
+ return output;
+ }
+
+ /**
+ * Gets task input.
+ *
+ * @return Task input.
+ */
+ public GridHadoopTaskInput input() {
+ return input;
+ }
+
+ /**
+ * @return Job.
+ */
+ public GridHadoopJob job() {
+ return job;
+ }
+
+ /**
+ * Gets counter for the given name.
+ *
+ * @param grp Counter group's name.
+ * @param name Counter name.
+ * @return Counter.
+ */
+ public abstract <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls);
+
+ /**
+ * Gets all known counters.
+ *
+ * @return Unmodifiable collection of counters.
+ */
+ public abstract GridHadoopCounters counters();
+
+ /**
+ * Sets input of the task.
+ *
+ * @param in Input.
+ */
+ public void input(GridHadoopTaskInput in) {
+ input = in;
+ }
+
+ /**
+ * Sets output of the task.
+ *
+ * @param out Output.
+ */
+ public void output(GridHadoopTaskOutput out) {
+ output = out;
+ }
+
+ /**
+ * Gets partitioner.
+ *
+ * @return Partitioner.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract GridHadoopPartitioner partitioner() throws IgniteCheckedException;
+
+ /**
+ * Gets serializer for values.
+ *
+ * @return Serializer for keys.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract GridHadoopSerialization keySerialization() throws IgniteCheckedException;
+
+ /**
+ * Gets serializer for values.
+ *
+ * @return Serializer for values.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract GridHadoopSerialization valueSerialization() throws IgniteCheckedException;
+
+ /**
+ * Gets sorting comparator.
+ *
+ * @return Comparator for sorting.
+ */
+ public abstract Comparator<Object> sortComparator();
+
+ /**
+ * Gets comparator for grouping on combine or reduce operation.
+ *
+ * @return Comparator.
+ */
+ public abstract Comparator<Object> groupComparator();
+
+ /**
+ * Execute current task.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract void run() throws IgniteCheckedException;
+
+ /**
+ * Cancel current task execution.
+ */
+ public abstract void cancel();
+
+ /**
+ * Prepare local environment for the task.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract void prepareTaskEnvironment() throws IgniteCheckedException;
+
+ /**
+ * Cleans up local environment of the task.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java
new file mode 100644
index 0000000..75e06ca
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Task info.
+ */
+public class GridHadoopTaskInfo implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private GridHadoopTaskType type;
+
+ /** */
+ private GridHadoopJobId jobId;
+
+ /** */
+ private int taskNum;
+
+ /** */
+ private int attempt;
+
+ /** */
+ private GridHadoopInputSplit inputSplit;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public GridHadoopTaskInfo() {
+ // No-op.
+ }
+
+ /**
+ * Creates new task info.
+ *
+ * @param type Task type.
+ * @param jobId Job id.
+ * @param taskNum Task number.
+ * @param attempt Attempt for this task.
+ * @param inputSplit Input split.
+ */
+ public GridHadoopTaskInfo(GridHadoopTaskType type, GridHadoopJobId jobId, int taskNum, int attempt,
+ @Nullable GridHadoopInputSplit inputSplit) {
+ this.type = type;
+ this.jobId = jobId;
+ this.taskNum = taskNum;
+ this.attempt = attempt;
+ this.inputSplit = inputSplit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeByte(type.ordinal());
+ out.writeObject(jobId);
+ out.writeInt(taskNum);
+ out.writeInt(attempt);
+ out.writeObject(inputSplit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ type = GridHadoopTaskType.fromOrdinal(in.readByte());
+ jobId = (GridHadoopJobId)in.readObject();
+ taskNum = in.readInt();
+ attempt = in.readInt();
+ inputSplit = (GridHadoopInputSplit)in.readObject();
+ }
+
+ /**
+ * @return Type.
+ */
+ public GridHadoopTaskType type() {
+ return type;
+ }
+
+ /**
+ * @return Job id.
+ */
+ public GridHadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Task number.
+ */
+ public int taskNumber() {
+ return taskNum;
+ }
+
+ /**
+ * @return Attempt.
+ */
+ public int attempt() {
+ return attempt;
+ }
+
+ /**
+ * @return Input split.
+ */
+ @Nullable public GridHadoopInputSplit inputSplit() {
+ return inputSplit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof GridHadoopTaskInfo))
+ return false;
+
+ GridHadoopTaskInfo that = (GridHadoopTaskInfo)o;
+
+ return attempt == that.attempt && taskNum == that.taskNum && jobId.equals(that.jobId) && type == that.type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = type.hashCode();
+
+ res = 31 * res + jobId.hashCode();
+ res = 31 * res + taskNum;
+ res = 31 * res + attempt;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridHadoopTaskInfo.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java
new file mode 100644
index 0000000..479cf6d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Task input.
+ */
+public interface GridHadoopTaskInput extends AutoCloseable {
+ /**
+ * Moves cursor to the next element.
+ *
+ * @return {@code false} If input is exceeded.
+ */
+ boolean next();
+
+ /**
+ * Gets current key.
+ *
+ * @return Key.
+ */
+ Object key();
+
+ /**
+ * Gets values for current key.
+ *
+ * @return Values.
+ */
+ Iterator<?> values();
+
+ /**
+ * Closes input.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ @Override public void close() throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java
new file mode 100644
index 0000000..6480d8d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * Task output.
+ */
+public interface GridHadoopTaskOutput extends AutoCloseable {
+ /**
+ * Writes key and value to the output.
+ *
+ * @param key Key.
+ * @param val Value.
+ */
+ public void write(Object key, Object val) throws IgniteCheckedException;
+
+ /**
+ * Closes output.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ @Override public void close() throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java
new file mode 100644
index 0000000..404d6b8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.jetbrains.annotations.*;
+
+/**
+* Task type.
+*/
+public enum GridHadoopTaskType {
+ /** Setup task. */
+ SETUP,
+
+ /** Map task. */
+ MAP,
+
+ /** Reduce task. */
+ REDUCE,
+
+ /** Combine task. */
+ COMBINE,
+
+ /** Commit task. */
+ COMMIT,
+
+ /** Abort task. */
+ ABORT;
+
+ /** Enumerated values. */
+ private static final GridHadoopTaskType[] VALS = values();
+
+ /**
+ * Efficiently gets enumerated value from its ordinal.
+ *
+ * @param ord Ordinal value.
+ * @return Enumerated value.
+ */
+ @Nullable public static GridHadoopTaskType fromOrdinal(byte ord) {
+ return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
index c861ea8..d0ef4ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
-import org.apache.ignite.hadoop.*;
import org.apache.ignite.internal.util.future.*;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
index 05e0e35..c2cf542 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.hadoop;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.hadoop.*;
/**
* Hadoop processor.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/package.html
new file mode 100644
index 0000000..6c751ed
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/package.html
@@ -0,0 +1,24 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+ <!-- Package description. -->
+ Contains Hadoop APIs.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java
index 2b286dc..2b8dc10 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client.hadoop;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.*;
@@ -32,7 +33,6 @@ import org.apache.hadoop.security.token.*;
import org.apache.ignite.*;
import org.apache.ignite.client.*;
import org.apache.ignite.client.hadoop.counter.*;
-import org.apache.ignite.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.proto.*;
import org.apache.ignite.internal.util.typedef.internal.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java
index 29967c7..ff01bf8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolProvider.java
@@ -21,9 +21,9 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.protocol.*;
import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
import org.apache.ignite.client.*;
import org.apache.ignite.client.marshaller.optimized.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java
index d5a3e90..9f4ec02 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/counter/GridHadoopClientCounters.java
@@ -19,7 +19,7 @@ package org.apache.ignite.client.hadoop.counter;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.counters.*;
-import org.apache.ignite.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import org.apache.ignite.internal.processors.hadoop.v2.*;
import org.apache.ignite.internal.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v1/GridGgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v1/GridGgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v1/GridGgfsHadoopFileSystem.java
index 5c28e45..21a5942 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v1/GridGgfsHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v1/GridGgfsHadoopFileSystem.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.util.*;
import org.apache.ignite.*;
import org.apache.ignite.fs.*;
import org.apache.ignite.internal.fs.common.*;
-import org.apache.ignite.internal.processors.fs.*;
import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.processors.fs.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v2/GridGgfsHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v2/GridGgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v2/GridGgfsHadoopFileSystem.java
index 04f2d46..f239471 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v2/GridGgfsHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/fs/hadoop/v2/GridGgfsHadoopFileSystem.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.util.*;
import org.apache.ignite.*;
import org.apache.ignite.fs.*;
import org.apache.ignite.internal.fs.common.*;
-import org.apache.ignite.internal.processors.fs.*;
import org.apache.ignite.internal.fs.hadoop.*;
+import org.apache.ignite.internal.processors.fs.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;