You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/01/17 10:44:56 UTC
[46/50] [abbrv] ignite git commit: IGNITE-4428: Hadoop: moved
HadoopMapReducePlanner and dependent classes to public space. This closes
#1389. This closes #1394.
IGNITE-4428: Hadoop: moved HadoopMapReducePlanner and dependent classes to public space. This closes #1389. This closes #1394.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d14e0727
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d14e0727
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d14e0727
Branch: refs/heads/ignite-3477
Commit: d14e0727b3dd61ab5ec2957133d77dbc25e9ba68
Parents: 77ca2e6
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Mon Jan 16 16:36:25 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Jan 16 16:36:25 2017 +0300
----------------------------------------------------------------------
.../configuration/HadoopConfiguration.java | 2 +-
.../apache/ignite/hadoop/HadoopInputSplit.java | 54 +++++++
.../org/apache/ignite/hadoop/HadoopJob.java | 74 ++++++++++
.../ignite/hadoop/HadoopMapReducePlan.java | 80 +++++++++++
.../ignite/hadoop/HadoopMapReducePlanner.java | 40 ++++++
.../processors/hadoop/HadoopDefaultJobInfo.java | 4 +-
.../processors/hadoop/HadoopFileBlock.java | 1 +
.../processors/hadoop/HadoopInputSplit.java | 54 -------
.../internal/processors/hadoop/HadoopJob.java | 107 --------------
.../internal/processors/hadoop/HadoopJobEx.java | 140 +++++++++++++++++++
.../processors/hadoop/HadoopJobInfo.java | 54 +++----
.../processors/hadoop/HadoopMapReducePlan.java | 80 -----------
.../hadoop/HadoopMapReducePlanner.java | 40 ------
.../processors/hadoop/HadoopTaskContext.java | 6 +-
.../processors/hadoop/HadoopTaskInfo.java | 1 +
.../hadoop/counter/HadoopCounterWriter.java | 4 +-
.../resources/META-INF/classnames.properties | 4 +-
.../fs/IgniteHadoopFileSystemCounterWriter.java | 6 +-
.../IgniteHadoopWeightedMapReducePlanner.java | 10 +-
.../planner/HadoopAbstractMapReducePlanner.java | 118 ++++++++++++++++
.../planner/HadoopTestRoundRobinMrPlanner.java | 75 ++++++++++
.../processors/hadoop/HadoopCommonUtils.java | 1 +
.../processors/hadoop/HadoopContext.java | 2 +
.../processors/hadoop/HadoopExternalSplit.java | 1 +
.../processors/hadoop/HadoopSplitWrapper.java | 1 +
.../HadoopFileSystemCounterWriterDelegate.java | 4 +-
...doopFileSystemCounterWriterDelegateImpl.java | 4 +-
.../hadoop/impl/v1/HadoopV1MapTask.java | 6 +-
.../hadoop/impl/v1/HadoopV1ReduceTask.java | 4 +-
.../hadoop/impl/v1/HadoopV1Splitter.java | 2 +-
.../hadoop/impl/v2/HadoopV2Context.java | 2 +-
.../processors/hadoop/impl/v2/HadoopV2Job.java | 15 +-
.../hadoop/impl/v2/HadoopV2Splitter.java | 2 +-
.../hadoop/impl/v2/HadoopV2TaskContext.java | 6 +-
.../hadoop/jobtracker/HadoopJobMetadata.java | 4 +-
.../hadoop/jobtracker/HadoopJobTracker.java | 32 ++---
.../planner/HadoopAbstractMapReducePlanner.java | 116 ---------------
.../planner/HadoopDefaultMapReducePlan.java | 4 +-
.../hadoop/shuffle/HadoopShuffle.java | 4 +-
.../hadoop/shuffle/HadoopShuffleJob.java | 7 +-
.../HadoopEmbeddedTaskExecutor.java | 8 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 6 +-
.../taskexecutor/HadoopTaskExecutorAdapter.java | 8 +-
.../external/HadoopExternalTaskExecutor.java | 16 +--
.../child/HadoopChildProcessRunner.java | 4 +-
.../resources/META-INF/classnames.properties | 4 +-
.../hadoop/impl/HadoopCommandLineTest.java | 4 +-
.../hadoop/impl/HadoopJobTrackerSelfTest.java | 1 +
.../hadoop/impl/HadoopPlannerMockJob.java | 10 +-
.../hadoop/impl/HadoopTasksAllVersionsTest.java | 16 +--
.../hadoop/impl/HadoopTasksV1Test.java | 4 +-
.../hadoop/impl/HadoopTasksV2Test.java | 4 +-
.../impl/HadoopTestRoundRobinMrPlanner.java | 75 ----------
.../hadoop/impl/HadoopTestTaskContext.java | 6 +-
.../hadoop/impl/HadoopV2JobSelfTest.java | 6 +-
.../HadoopWeightedMapReducePlannerTest.java | 6 +-
.../collections/HadoopAbstractMapTest.java | 4 +-
57 files changed, 738 insertions(+), 615 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java
index 84014d6..6443a67 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java
@@ -18,7 +18,7 @@
package org.apache.ignite.configuration;
import org.apache.ignite.lifecycle.LifecycleBean;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java
new file mode 100644
index 0000000..4138e64
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop;
+
+import java.io.Externalizable;
+
+/**
+ * Abstract fragment of an input data source.
+ */
+public abstract class HadoopInputSplit implements Externalizable {
+ /** */
+ protected String[] hosts;
+
+ /**
+ * Array of hosts where this input split resides.
+ *
+ * @return Hosts.
+ */
+ public String[] hosts() {
+ assert hosts != null;
+
+ return hosts;
+ }
+
+ /**
+ * This method must be implemented for purpose of internal implementation.
+ *
+ * @param obj Another object.
+ * @return {@code true} If objects are equal.
+ */
+ @Override public abstract boolean equals(Object obj);
+
+ /**
+ * This method must be implemented for purpose of internal implementation.
+ *
+ * @return Hash code of the object.
+ */
+ @Override public abstract int hashCode();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java
new file mode 100644
index 0000000..8ee0330
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.hadoop;
+
+import java.util.Collection;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Compact job description.
+ */
+public interface HadoopJob {
+ /**
+ * Gets collection of input splits for this job.
+ *
+ * @return Input splits.
+ */
+ public Collection<HadoopInputSplit> input();
+
+ /**
+ * Gets optional configuration property for the job.
+ *
+ * @param name Property name.
+ * @return Value or {@code null} if none.
+ */
+ @Nullable String property(String name);
+
+ /**
+ * Checks whether job has combiner.
+ *
+ * @return {@code true} If job has combiner.
+ */
+ boolean hasCombiner();
+
+ /**
+ * Checks whether job has reducer.
+ * Actual number of reducers will be in {@link HadoopMapReducePlan#reducers()}.
+ *
+ * @return Number of reducer.
+ */
+ boolean hasReducer();
+
+ /**
+ * @return Number of reducers configured for job.
+ */
+ int reducers();
+
+ /**
+ * Gets job name.
+ *
+ * @return Job name.
+ */
+ String jobName();
+
+ /**
+ * Gets user name.
+ *
+ * @return User name.
+ */
+ String user();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java
new file mode 100644
index 0000000..f77fb64
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Map-reduce job execution plan.
+ */
+public interface HadoopMapReducePlan extends Serializable {
+ /**
+ * Gets collection of file blocks for which mappers should be executed.
+ *
+ * @param nodeId Node ID to check.
+ * @return Collection of file blocks or {@code null} if no mappers should be executed on given node.
+ */
+ @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId);
+
+ /**
+ * Gets reducer IDs that should be started on given node.
+ *
+ * @param nodeId Node ID to check.
+ * @return Array of reducer IDs.
+ */
+ @Nullable public int[] reducers(UUID nodeId);
+
+ /**
+ * Gets collection of all node IDs involved in map part of job execution.
+ *
+ * @return Collection of node IDs.
+ */
+ public Collection<UUID> mapperNodeIds();
+
+ /**
+ * Gets collection of all node IDs involved in reduce part of job execution.
+ *
+ * @return Collection of node IDs.
+ */
+ public Collection<UUID> reducerNodeIds();
+
+ /**
+ * Gets overall number of mappers for the job.
+ *
+ * @return Number of mappers.
+ */
+ public int mappers();
+
+ /**
+ * Gets overall number of reducers for the job.
+ *
+ * @return Number of reducers.
+ */
+ public int reducers();
+
+ /**
+ * Gets node ID for reducer.
+ *
+ * @param reducer Reducer.
+ * @return Node ID.
+ */
+ public UUID nodeForReducer(int reducer);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java
new file mode 100644
index 0000000..8d77b70
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Map-reduce execution planner.
+ */
+public interface HadoopMapReducePlanner {
+ /**
+ * Prepares map-reduce execution plan for the given job and topology.
+ *
+ * @param job Job.
+ * @param top Topology.
+ * @param oldPlan Old plan in case of partial failure.
+ * @return Map reduce plan.
+ * @throws IgniteCheckedException If an error occurs.
+ */
+ public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
+ @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index ae17ac8..ab38e4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -82,12 +82,12 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
}
/** {@inheritDoc} */
- @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
+ @Override public HadoopJobEx createJob(Class<? extends HadoopJobEx> jobCls, HadoopJobId jobId, IgniteLogger log,
@Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
assert jobCls != null;
try {
- Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class,
+ Constructor<? extends HadoopJobEx> constructor = jobCls.getConstructor(HadoopJobId.class,
HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class);
return constructor.newInstance(jobId, this, log, libNames, helper);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java
index bc665eb..351abce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java
@@ -22,6 +22,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.URI;
import java.util.Arrays;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java
deleted file mode 100644
index 998cb61..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.Externalizable;
-
-/**
- * Abstract fragment of an input data source.
- */
-public abstract class HadoopInputSplit implements Externalizable {
- /** */
- protected String[] hosts;
-
- /**
- * Array of hosts where this input split resides.
- *
- * @return Hosts.
- */
- public String[] hosts() {
- assert hosts != null;
-
- return hosts;
- }
-
- /**
- * This method must be implemented for purpose of internal implementation.
- *
- * @param obj Another object.
- * @return {@code true} If objects are equal.
- */
- @Override public abstract boolean equals(Object obj);
-
- /**
- * This method must be implemented for purpose of internal implementation.
- *
- * @return Hash code of the object.
- */
- @Override public abstract int hashCode();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
deleted file mode 100644
index a77c744..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.Collection;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-
-/**
- * Hadoop job.
- */
-public interface HadoopJob {
- /**
- * Gets job ID.
- *
- * @return Job ID.
- */
- public HadoopJobId id();
-
- /**
- * Gets job information.
- *
- * @return Job information.
- */
- public HadoopJobInfo info();
-
- /**
- * Gets collection of input splits for this job.
- *
- * @return Input splits.
- */
- public Collection<HadoopInputSplit> input() throws IgniteCheckedException;
-
- /**
- * Returns context for task execution.
- *
- * @param info Task info.
- * @return Task Context.
- * @throws IgniteCheckedException If failed.
- */
- public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException;
-
- /**
- * Does all the needed initialization for the job. Will be called on each node where tasks for this job must
- * be executed.
- * <p>
- * If job is running in external mode this method will be called on instance in Ignite node with parameter
- * {@code false} and on instance in external process with parameter {@code true}.
- *
- * @param external If {@code true} then this job instance resides in external process.
- * @param locNodeId Local node ID.
- * @throws IgniteCheckedException If failed.
- */
- public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException;
-
- /**
- * Release all the resources.
- * <p>
- * If job is running in external mode this method will be called on instance in Ignite node with parameter
- * {@code false} and on instance in external process with parameter {@code true}.
- *
- * @param external If {@code true} then this job instance resides in external process.
- * @throws IgniteCheckedException If failed.
- */
- public void dispose(boolean external) throws IgniteCheckedException;
-
- /**
- * Prepare local environment for the task.
- *
- * @param info Task info.
- * @throws IgniteCheckedException If failed.
- */
- public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException;
-
- /**
- * Cleans up local environment of the task.
- *
- * @param info Task info.
- * @throws IgniteCheckedException If failed.
- */
- public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException;
-
- /**
- * Cleans up the job staging directory.
- */
- public void cleanupStagingDirectory();
-
- /**
- * @return Ignite work directory.
- */
- public String igniteWorkDirectory();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java
new file mode 100644
index 0000000..ba78af9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopJob;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop job.
+ */
+public abstract class HadoopJobEx implements HadoopJob {
+ /**
+ * Gets job ID.
+ *
+ * @return Job ID.
+ */
+ abstract public HadoopJobId id();
+
+ /**
+ * Gets job information.
+ *
+ * @return Job information.
+ */
+ abstract public HadoopJobInfo info();
+
+ /**
+ * Gets collection of input splits for this job.
+ *
+ * @return Input splits.
+ */
+ abstract public Collection<HadoopInputSplit> input();
+
+ /**
+ * Returns context for task execution.
+ *
+ * @param info Task info.
+ * @return Task Context.
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException;
+
+ /**
+ * Does all the needed initialization for the job. Will be called on each node where tasks for this job must
+ * be executed.
+ * <p>
+ * If job is running in external mode this method will be called on instance in Ignite node with parameter
+ * {@code false} and on instance in external process with parameter {@code true}.
+ *
+ * @param external If {@code true} then this job instance resides in external process.
+ * @param locNodeId Local node ID.
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException;
+
+ /**
+ * Release all the resources.
+ * <p>
+ * If job is running in external mode this method will be called on instance in Ignite node with parameter
+ * {@code false} and on instance in external process with parameter {@code true}.
+ *
+ * @param external If {@code true} then this job instance resides in external process.
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract public void dispose(boolean external) throws IgniteCheckedException;
+
+ /**
+ * Prepare local environment for the task.
+ *
+ * @param info Task info.
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException;
+
+ /**
+ * Cleans up local environment of the task.
+ *
+ * @param info Task info.
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException;
+
+ /**
+ * Cleans up the job staging directory.
+ */
+ abstract public void cleanupStagingDirectory();
+
+ /**
+ * @return Ignite work directory.
+ */
+ abstract public String igniteWorkDirectory();
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String property(String name) {
+ return info().property(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasCombiner() {
+ return info().hasCombiner();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasReducer() {
+ return info().hasReducer();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int reducers() {
+ return info().reducers();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String jobName() {
+ return info().jobName();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ return info().user();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index 853c63d..4cc8f80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -17,29 +17,29 @@
package org.apache.ignite.internal.processors.hadoop;
-import java.io.Serializable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
import org.jetbrains.annotations.Nullable;
/**
- * Compact job description.
+ * Extended job description.
*/
-public interface HadoopJobInfo extends Serializable {
+public interface HadoopJobInfo {
/**
* Gets optional configuration property for the job.
*
* @param name Property name.
* @return Value or {@code null} if none.
*/
- @Nullable public String property(String name);
+ @Nullable String property(String name);
/**
* Checks whether job has combiner.
*
* @return {@code true} If job has combiner.
*/
- public boolean hasCombiner();
+ boolean hasCombiner();
/**
* Checks whether job has reducer.
@@ -47,42 +47,42 @@ public interface HadoopJobInfo extends Serializable {
*
* @return Number of reducer.
*/
- public boolean hasReducer();
-
- /**
- * Creates new job instance for the given ID.
- * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJob} is for one job execution.
- * This method will be called once for the same ID on one node, though it can be called on the same host
- * multiple times from different processes (in case of multiple nodes on the same host or external execution).
- *
- * @param jobCls The job class.
- * @param jobId Job ID.
- * @param log Logger.
- * @param libNames Optional additional native library names.
- * @param helper HadoopHelper.
- * @return Job.
- * @throws IgniteCheckedException If failed.
- */
- public HadoopJob createJob(Class<? extends HadoopJob> jobCls,
- HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper)
- throws IgniteCheckedException;
+ boolean hasReducer();
/**
* @return Number of reducers configured for job.
*/
- public int reducers();
+ int reducers();
/**
* Gets job name.
*
* @return Job name.
*/
- public String jobName();
+ String jobName();
/**
* Gets user name.
*
* @return User name.
*/
- public String user();
+ String user();
+
+ /**
+ * Creates new job instance for the given ID.
+ * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJobEx} is for one job execution.
+ * This method will be called once for the same ID on one node, though it can be called on the same host
+ * multiple times from different processes (in case of multiple nodes on the same host or external execution).
+ *
+ * @param jobCls The job class.
+ * @param jobId Job ID.
+ * @param log Logger.
+ * @param libNames Optional additional native library names.
+ * @param helper HadoopHelper.
+ * @return Job.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopJobEx createJob(Class<? extends HadoopJobEx> jobCls,
+ HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper)
+ throws IgniteCheckedException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
deleted file mode 100644
index aadc2bf..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.UUID;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Map-reduce job execution plan.
- */
-public interface HadoopMapReducePlan extends Serializable {
- /**
- * Gets collection of file blocks for which mappers should be executed.
- *
- * @param nodeId Node ID to check.
- * @return Collection of file blocks or {@code null} if no mappers should be executed on given node.
- */
- @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId);
-
- /**
- * Gets reducer IDs that should be started on given node.
- *
- * @param nodeId Node ID to check.
- * @return Array of reducer IDs.
- */
- @Nullable public int[] reducers(UUID nodeId);
-
- /**
- * Gets collection of all node IDs involved in map part of job execution.
- *
- * @return Collection of node IDs.
- */
- public Collection<UUID> mapperNodeIds();
-
- /**
- * Gets collection of all node IDs involved in reduce part of job execution.
- *
- * @return Collection of node IDs.
- */
- public Collection<UUID> reducerNodeIds();
-
- /**
- * Gets overall number of mappers for the job.
- *
- * @return Number of mappers.
- */
- public int mappers();
-
- /**
- * Gets overall number of reducers for the job.
- *
- * @return Number of reducers.
- */
- public int reducers();
-
- /**
- * Gets node ID for reducer.
- *
- * @param reducer Reducer.
- * @return Node ID.
- */
- public UUID nodeForReducer(int reducer);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
deleted file mode 100644
index 0009c4a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Map-reduce execution planner.
- */
-public interface HadoopMapReducePlanner {
- /**
- * Prepares map-reduce execution plan for the given job and topology.
- *
- * @param job Job.
- * @param top Topology.
- * @param oldPlan Old plan in case of partial failure.
- * @return Map reduce plan.
- * @throws IgniteCheckedException If an error occurs.
- */
- public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
- @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index dddd017..194c1dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawCompar
*/
public abstract class HadoopTaskContext {
/** */
- protected final HadoopJob job;
+ protected final HadoopJobEx job;
/** */
private HadoopTaskInput input;
@@ -44,7 +44,7 @@ public abstract class HadoopTaskContext {
* @param taskInfo Task info.
* @param job Job.
*/
- protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJob job) {
+ protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJobEx job) {
this.taskInfo = taskInfo;
this.job = job;
}
@@ -88,7 +88,7 @@ public abstract class HadoopTaskContext {
/**
* @return Job.
*/
- public HadoopJob job() {
+ public HadoopJobEx job() {
return job;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
index 3509367..eb3113c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
index 6c033b2..93a69db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.hadoop.counter;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
/**
* The object that writes some system counters to some storage for each running job. This operation is a part of
@@ -32,5 +32,5 @@ public interface HadoopCounterWriter {
* @param cntrs Counters.
* @throws IgniteCheckedException If failed.
*/
- public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException;
+ public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 212e94a..02bad40 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1110,13 +1110,13 @@ org.apache.ignite.internal.processors.dr.GridDrType
org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater
org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo
org.apache.ignite.internal.processors.hadoop.HadoopFileBlock
-org.apache.ignite.internal.processors.hadoop.HadoopInputSplit
+org.apache.ignite.hadoop.HadoopInputSplit
org.apache.ignite.internal.processors.hadoop.HadoopJobId
org.apache.ignite.internal.processors.hadoop.HadoopJobInfo
org.apache.ignite.internal.processors.hadoop.HadoopJobPhase
org.apache.ignite.internal.processors.hadoop.HadoopJobProperty
org.apache.ignite.internal.processors.hadoop.HadoopJobStatus
-org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan
+org.apache.ignite.hadoop.HadoopMapReducePlan
org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo
org.apache.ignite.internal.processors.hadoop.HadoopTaskType
org.apache.ignite.internal.processors.hadoop.message.HadoopMessage
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index f1c1b16..1128fa4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -18,7 +18,7 @@
package org.apache.ignite.hadoop.fs;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
@@ -41,7 +41,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
private volatile HadoopFileSystemCounterWriterDelegate delegate;
/** {@inheritDoc} */
- @Override public void write(HadoopJob job, HadoopCounters cntrs)
+ @Override public void write(HadoopJobEx job, HadoopCounters cntrs)
throws IgniteCheckedException {
delegate(job).write(job, cntrs);
}
@@ -52,7 +52,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
* @param job Job.
* @return Delegate.
*/
- private HadoopFileSystemCounterWriterDelegate delegate(HadoopJob job) {
+ private HadoopFileSystemCounterWriterDelegate delegate(HadoopJobEx job) {
HadoopFileSystemCounterWriterDelegate delegate0 = delegate;
if (delegate0 == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 178cdb5..bb0b47f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -21,16 +21,16 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.HadoopJob;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.hadoop.planner.HadoopAbstractMapReducePlanner;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
@@ -117,7 +117,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
@Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes,
@Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
List<HadoopInputSplit> splits = HadoopCommonUtils.sortInputSplits(job.input());
- int reducerCnt = job.info().reducers();
+ int reducerCnt = job.reducers();
if (reducerCnt < 0)
throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + reducerCnt);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java
new file mode 100644
index 0000000..dd01f11
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.planner;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
+
+/**
+ * Base class for map-reduce planners.
+ */
+public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner {
+ /** Injected grid. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Logger. */
+ @SuppressWarnings("UnusedDeclaration")
+ @LoggerResource
+ protected IgniteLogger log;
+
+ /**
+ * Create plan topology.
+ *
+ * @param nodes Topology nodes.
+ * @return Plan topology.
+ */
+ protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) {
+ Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size());
+
+ Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size());
+ Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size());
+
+ for (ClusterNode node : nodes) {
+ String macs = node.attribute(ATTR_MACS);
+
+ HadoopMapReducePlanGroup grp = macsMap.get(macs);
+
+ if (grp == null) {
+ grp = new HadoopMapReducePlanGroup(node, macs);
+
+ macsMap.put(macs, grp);
+ }
+ else
+ grp.add(node);
+
+ idToGrp.put(node.id(), grp);
+
+ for (String host : node.addresses()) {
+ HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
+
+ if (hostGrp == null)
+ hostToGrp.put(host, grp);
+ else
+ assert hostGrp == grp;
+ }
+ }
+
+ return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
+ }
+
+
+ /**
+ * Groups nodes by host names.
+ *
+ * @param top Topology to group.
+ * @return Map.
+ */
+ protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) {
+ Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
+
+ for (ClusterNode node : top) {
+ for (String host : node.hostNames()) {
+ Collection<UUID> nodeIds = grouped.get(host);
+
+ if (nodeIds == null) {
+ // Expecting 1-2 nodes per host.
+ nodeIds = new ArrayList<>(2);
+
+ grouped.put(host, nodeIds);
+ }
+
+ nodeIds.add(node.id());
+ }
+ }
+
+ return grouped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java
new file mode 100644
index 0000000..d9de0c1
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.planner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Round-robin mr planner.
+ */
+public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner {
+ /** {@inheritDoc} */
+ @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
+ @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+ if (top.isEmpty())
+ throw new IllegalArgumentException("Topology is empty");
+
+ // Has at least one element.
+ Iterator<ClusterNode> it = top.iterator();
+
+ Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
+
+ for (HadoopInputSplit block : job.input()) {
+ ClusterNode node = it.next();
+
+ Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id());
+
+ if (nodeBlocks == null) {
+ nodeBlocks = new ArrayList<>();
+
+ mappers.put(node.id(), nodeBlocks);
+ }
+
+ nodeBlocks.add(block);
+
+ if (!it.hasNext())
+ it = top.iterator();
+ }
+
+ int[] rdc = new int[job.reducers()];
+
+ for (int i = 0; i < rdc.length; i++)
+ rdc[i] = i;
+
+ return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
index 37af147..7e74d82 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
index 4326ad2..f125485 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
index a9b4532..7db535a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.hadoop;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.util.typedef.internal.S;
import java.io.Externalizable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
index fb6d0f3..fe5d434 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
@@ -22,6 +22,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
index 541cf80..0d82b5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.processors.hadoop.delegate;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
/**
@@ -32,5 +32,5 @@ public interface HadoopFileSystemCounterWriterDelegate {
* @param cntrs Counters.
* @throws IgniteCheckedException If failed.
*/
- public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException;
+ public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
index d4c10da..6b36d26 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
@@ -60,7 +60,7 @@ public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSyst
}
/** {@inheritDoc} */
- public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException {
+ public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException {
Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
final HadoopJobInfo jobInfo = job.info();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index 65ff280..cde6da6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -54,7 +54,7 @@ public class HadoopV1MapTask extends HadoopV1Task {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- HadoopJob job = taskCtx.job();
+ HadoopJobEx job = taskCtx.job();
HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 92c024e..6b90653 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -51,7 +51,7 @@ public class HadoopV1ReduceTask extends HadoopV1Task {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- HadoopJob job = taskCtx.job();
+ HadoopJobEx job = taskCtx.job();
HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
index 11a3598..26325b9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index eec0636..11f2ecc 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
index a24e581..5d3f22d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
@@ -37,8 +38,8 @@ import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
@@ -85,7 +86,7 @@ import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSys
/**
* Hadoop job implementation for v2 API.
*/
-public class HadoopV2Job implements HadoopJob {
+public class HadoopV2Job extends HadoopJobEx {
/** */
private final JobConf jobConf;
@@ -139,6 +140,7 @@ public class HadoopV2Job implements HadoopJob {
* @param jobInfo Job info.
* @param log Logger.
* @param libNames Optional additional native library names.
+ * @param helper Hadoop helper.
*/
public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log,
@Nullable String[] libNames, HadoopHelper helper) {
@@ -182,7 +184,7 @@ public class HadoopV2Job implements HadoopJob {
}
/** {@inheritDoc} */
- @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
+ @Override public Collection<HadoopInputSplit> input() {
ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf.getClassLoader());
try {
@@ -239,6 +241,9 @@ public class HadoopV2Job implements HadoopJob {
throw transformException(e);
}
}
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
finally {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
}
@@ -274,7 +279,7 @@ public class HadoopV2Job implements HadoopJob {
fullCtxClsQueue.add(cls);
}
- Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class,
+ Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJobEx.class,
HadoopJobId.class, UUID.class, DataInput.class);
if (jobConfData == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
index 667ef1e..c878515 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.jetbrains.annotations.Nullable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index d328550..8b8a728 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -45,8 +45,8 @@ import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
@@ -165,7 +165,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
* @param locNodeId Local node ID.
* @param jobConfDataInput DataInput for read JobConf.
*/
- public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId,
+ public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJobEx job, HadoopJobId jobId,
@Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
super(taskInfo, job);
this.locNodeId = locNodeId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
index 090b336..1035701 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
@@ -24,11 +24,11 @@ import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 9542372..9284c02 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -49,14 +49,14 @@ import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
import org.apache.ignite.internal.processors.hadoop.HadoopContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
@@ -114,7 +114,7 @@ public class HadoopJobTracker extends HadoopComponent {
private HadoopMapReducePlanner mrPlanner;
/** All the known jobs. */
- private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJob>> jobs = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobEx>> jobs = new ConcurrentHashMap8<>();
/** Locally active jobs. */
private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>();
@@ -129,8 +129,8 @@ public class HadoopJobTracker extends HadoopComponent {
/** Component busy lock. */
private GridSpinReadWriteLock busyLock;
- /** Class to create HadoopJob instances from. */
- private Class<? extends HadoopJob> jobCls;
+ /** Class to create HadoopJobEx instances from. */
+ private Class<? extends HadoopJobEx> jobCls;
/** Closure to check result of async transform of system cache. */
private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() {
@@ -158,7 +158,7 @@ public class HadoopJobTracker extends HadoopComponent {
HadoopClassLoader ldr = ctx.kernalContext().hadoopHelper().commonClassLoader();
try {
- jobCls = (Class<HadoopJob>)ldr.loadClass(HadoopCommonUtils.JOB_CLS_NAME);
+ jobCls = (Class<HadoopJobEx>)ldr.loadClass(HadoopCommonUtils.JOB_CLS_NAME);
}
catch (Exception ioe) {
throw new IgniteCheckedException("Failed to load job class [class=" +
@@ -310,7 +310,7 @@ public class HadoopJobTracker extends HadoopComponent {
if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId))
throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
- HadoopJob job = job(jobId, info);
+ HadoopJobEx job = job(jobId, info);
HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
@@ -692,7 +692,7 @@ public class HadoopJobTracker extends HadoopComponent {
try {
if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) {
// Failover setup task.
- HadoopJob job = job(jobId, meta.jobInfo());
+ HadoopJobEx job = job(jobId, meta.jobInfo());
Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
@@ -818,7 +818,7 @@ public class HadoopJobTracker extends HadoopComponent {
throws IgniteCheckedException {
JobLocalState state = activeJobs.get(jobId);
- HadoopJob job = job(jobId, meta.jobInfo());
+ HadoopJobEx job = job(jobId, meta.jobInfo());
HadoopMapReducePlan plan = meta.mapReducePlan();
@@ -1048,7 +1048,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param job Job instance.
* @return Collection of task infos.
*/
- private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) {
+ private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJobEx job) {
UUID locNodeId = ctx.localNodeId();
HadoopJobId jobId = job.id();
@@ -1097,15 +1097,15 @@ public class HadoopJobTracker extends HadoopComponent {
* @return Job.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException {
- GridFutureAdapter<HadoopJob> fut = jobs.get(jobId);
+ @Nullable public HadoopJobEx job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException {
+ GridFutureAdapter<HadoopJobEx> fut = jobs.get(jobId);
- if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJob>())) != null)
+ if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJobEx>())) != null)
return fut.get();
fut = jobs.get(jobId);
- HadoopJob job = null;
+ HadoopJobEx job = null;
try {
if (jobInfo == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
deleted file mode 100644
index f01f72b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
-
-/**
- * Base class for map-reduce planners.
- */
-public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner {
- /** Injected grid. */
- @IgniteInstanceResource
- protected Ignite ignite;
-
- /** Logger. */
- @SuppressWarnings("UnusedDeclaration")
- @LoggerResource
- protected IgniteLogger log;
-
- /**
- * Create plan topology.
- *
- * @param nodes Topology nodes.
- * @return Plan topology.
- */
- protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) {
- Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size());
-
- Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size());
- Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size());
-
- for (ClusterNode node : nodes) {
- String macs = node.attribute(ATTR_MACS);
-
- HadoopMapReducePlanGroup grp = macsMap.get(macs);
-
- if (grp == null) {
- grp = new HadoopMapReducePlanGroup(node, macs);
-
- macsMap.put(macs, grp);
- }
- else
- grp.add(node);
-
- idToGrp.put(node.id(), grp);
-
- for (String host : node.addresses()) {
- HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
-
- if (hostGrp == null)
- hostToGrp.put(host, grp);
- else
- assert hostGrp == grp;
- }
- }
-
- return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
- }
-
-
- /**
- * Groups nodes by host names.
- *
- * @param top Topology to group.
- * @return Map.
- */
- protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) {
- Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
-
- for (ClusterNode node : top) {
- for (String host : node.hostNames()) {
- Collection<UUID> nodeIds = grouped.get(host);
-
- if (nodeIds == null) {
- // Expecting 1-2 nodes per host.
- nodeIds = new ArrayList<>(2);
-
- grouped.put(host, nodeIds);
- }
-
- nodeIds.add(node.id());
- }
- }
-
- return grouped;
- }
-}