You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 15:14:24 UTC
[06/13] incubator-ignite git commit: # IGNITE-386: Moving core
classes (6).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskOutput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskOutput.java
new file mode 100644
index 0000000..41d9847
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskOutput.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * Task output.
+ */
+public interface HadoopTaskOutput extends AutoCloseable {
+ /**
+ * Writes key and value to the output.
+ *
+ * @param key Key.
+ * @param val Value.
+ */
+ public void write(Object key, Object val) throws IgniteCheckedException;
+
+ /**
+ * Closes output.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ @Override public void close() throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskType.java
new file mode 100644
index 0000000..a88e189
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.jetbrains.annotations.*;
+
+/**
+* Task type.
+*/
+public enum HadoopTaskType {
+ /** Setup task. */
+ SETUP,
+
+ /** Map task. */
+ MAP,
+
+ /** Reduce task. */
+ REDUCE,
+
+ /** Combine task. */
+ COMBINE,
+
+ /** Commit task. */
+ COMMIT,
+
+ /** Abort task. */
+ ABORT;
+
+ /** Enumerated values. */
+ private static final HadoopTaskType[] VALS = values();
+
+ /**
+ * Efficiently gets enumerated value from its ordinal.
+ *
+ * @param ord Ordinal value.
+ * @return Enumerated value.
+ */
+ @Nullable public static HadoopTaskType fromOrdinal(byte ord) {
+ return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
index d0ef4ce..caa9194 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
@@ -33,7 +33,7 @@ public class IgniteHadoopNoopProcessor extends IgniteHadoopProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public GridHadoop hadoop() {
+ @Override public Hadoop hadoop() {
throw new IllegalStateException("Hadoop module is not found in class path.");
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
index c2cf542..d40d5e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
@@ -35,7 +35,7 @@ public abstract class IgniteHadoopProcessorAdapter extends GridProcessorAdapter
/**
* @return Hadoop facade.
*/
- public abstract GridHadoop hadoop();
+ public abstract Hadoop hadoop();
/**
* @return Hadoop configuration.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 449cff2..01e554c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -31,7 +31,7 @@ import java.util.*;
/**
* Statistic writer implementation that writes info into any Hadoop file system.
*/
-public class IgniteHadoopFileSystemCounterWriter implements GridHadoopCounterWriter {
+public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter {
/** */
public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
index 3482640..39b9ba6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
@@ -39,7 +39,7 @@ public class HadoopCounters extends Counters {
* @param cntrs Counters to adapt.
*/
public HadoopCounters(GridHadoopCounters cntrs) {
- for (GridHadoopCounter cntr : cntrs.all())
+ for (HadoopCounter cntr : cntrs.all())
if (cntr instanceof HadoopLongCounter)
this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
}
@@ -73,7 +73,7 @@ public class HadoopCounters extends Counters {
@Override public synchronized Iterable<String> getGroupNames() {
Collection<String> res = new HashSet<>();
- for (GridHadoopCounter counter : cntrs.values())
+ for (HadoopCounter counter : cntrs.values())
res.add(counter.group());
return res;
@@ -167,7 +167,7 @@ public class HadoopCounters extends Counters {
public int groupSize(String grpName) {
int res = 0;
- for (GridHadoopCounter counter : cntrs.values()) {
+ for (HadoopCounter counter : cntrs.values()) {
if (grpName.equals(counter.group()))
res++;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index 2f44778..438874a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -82,7 +82,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable {
}
/** {@inheritDoc} */
- @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
+ @Override public HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
try {
Class<?> jobCls0 = jobCls;
@@ -99,7 +99,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable {
Constructor<?> constructor = jobCls0.getConstructor(GridHadoopJobId.class, HadoopDefaultJobInfo.class,
IgniteLogger.class);
- return (GridHadoopJob)constructor.newInstance(jobId, this, log);
+ return (HadoopJob)constructor.newInstance(jobId, this, log);
}
// NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call.
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
index b87e7f8..b4f2c87 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
@@ -25,7 +25,7 @@ import org.jetbrains.annotations.*;
/**
* Hadoop facade implementation.
*/
-public class HadoopImpl implements GridHadoop {
+public class HadoopImpl implements Hadoop {
/** Hadoop processor. */
private final HadoopProcessor proc;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
index 1f50b0c..75e55fd 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -45,7 +45,7 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
/** Hadoop facade for public API. */
@GridToStringExclude
- private GridHadoop hadoop;
+ private Hadoop hadoop;
/**
* @param ctx Kernal context.
@@ -158,7 +158,7 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public GridHadoop hadoop() {
+ @Override public Hadoop hadoop() {
if (hadoop == null)
throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
"is HADOOP_HOME environment variable set?)");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
index 3fdce14..4b96f7d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
@@ -26,7 +26,7 @@ import java.io.*;
/**
* Default Hadoop counter implementation.
*/
-public abstract class HadoopCounterAdapter implements GridHadoopCounter, Externalizable {
+public abstract class HadoopCounterAdapter implements HadoopCounter, Externalizable {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
index 01b1473..bfd59ef 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
@@ -36,7 +36,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
private static final long serialVersionUID = 0L;
/** */
- private final ConcurrentMap<CounterKey, GridHadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<CounterKey, HadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
/**
* Default constructor. Creates new instance without counters.
@@ -50,7 +50,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
*
* @param cntrs Counters to store.
*/
- public HadoopCountersImpl(Iterable<GridHadoopCounter> cntrs) {
+ public HadoopCountersImpl(Iterable<HadoopCounter> cntrs) {
addCounters(cntrs, true);
}
@@ -71,7 +71,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
* @param name Counter name.
* @return Counter.
*/
- private <T extends GridHadoopCounter> T createCounter(Class<? extends GridHadoopCounter> cls, String grp,
+ private <T extends HadoopCounter> T createCounter(Class<? extends HadoopCounter> cls, String grp,
String name) {
try {
Constructor constructor = cls.getConstructor(String.class, String.class);
@@ -89,12 +89,12 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
* @param cntrs Counters to add.
* @param cp Whether to copy counters or not.
*/
- private void addCounters(Iterable<GridHadoopCounter> cntrs, boolean cp) {
+ private void addCounters(Iterable<HadoopCounter> cntrs, boolean cp) {
assert cntrs != null;
- for (GridHadoopCounter cntr : cntrs) {
+ for (HadoopCounter cntr : cntrs) {
if (cp) {
- GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
+ HadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
cntrCp.merge(cntr);
@@ -106,7 +106,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
}
/** {@inheritDoc} */
- @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
+ @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
assert cls != null;
CounterKey mapKey = new CounterKey(cls, grp, name);
@@ -126,13 +126,13 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
}
/** {@inheritDoc} */
- @Override public Collection<GridHadoopCounter> all() {
+ @Override public Collection<HadoopCounter> all() {
return cntrsMap.values();
}
/** {@inheritDoc} */
@Override public void merge(GridHadoopCounters other) {
- for (GridHadoopCounter counter : other.all())
+ for (HadoopCounter counter : other.all())
counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
}
@@ -144,7 +144,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- addCounters(U.<GridHadoopCounter>readCollection(in), false);
+ addCounters(U.<HadoopCounter>readCollection(in), false);
}
/** {@inheritDoc} */
@@ -173,7 +173,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
/**
* The tuple of counter identifier components for more readable code.
*/
- private static class CounterKey extends GridTuple3<Class<? extends GridHadoopCounter>, String, String> {
+ private static class CounterKey extends GridTuple3<Class<? extends HadoopCounter>, String, String> {
/** */
private static final long serialVersionUID = 0L;
@@ -184,7 +184,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
* @param grp Group name.
* @param name Counter name.
*/
- private CounterKey(Class<? extends GridHadoopCounter> cls, String grp, String name) {
+ private CounterKey(Class<? extends HadoopCounter> cls, String grp, String name) {
super(cls, grp, name);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
index 1aa1e0e..d926706 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
@@ -59,7 +59,7 @@ public class HadoopLongCounter extends HadoopCounterAdapter {
}
/** {@inheritDoc} */
- @Override public void merge(GridHadoopCounter cntr) {
+ @Override public void merge(HadoopCounter cntr) {
val += ((HadoopLongCounter)cntr).val;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
index f22d0cd..6f57ae4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
@@ -97,7 +97,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
}
/** {@inheritDoc} */
- @Override public void merge(GridHadoopCounter cntr) {
+ @Override public void merge(HadoopCounter cntr) {
evts.addAll(((HadoopPerformanceCounter)cntr).evts);
}
@@ -162,7 +162,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
* @param ts Timestamp of the event.
*/
public void onTaskFinish(GridHadoopTaskInfo info, long ts) {
- if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) {
+ if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) {
evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
index 6042775..2d64277 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
@@ -48,7 +48,7 @@ public class HadoopJobMetadata implements Externalizable {
private GridHadoopMapReducePlan mrPlan;
/** Pending splits for which mapper should be executed. */
- private Map<GridHadoopInputSplit, Integer> pendingSplits;
+ private Map<HadoopInputSplit, Integer> pendingSplits;
/** Pending reducers. */
private Collection<Integer> pendingReducers;
@@ -154,7 +154,7 @@ public class HadoopJobMetadata implements Externalizable {
*
* @param pendingSplits Collection of pending splits.
*/
- public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) {
+ public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) {
this.pendingSplits = pendingSplits;
}
@@ -163,7 +163,7 @@ public class HadoopJobMetadata implements Externalizable {
*
* @return Collection of pending splits.
*/
- public Map<GridHadoopInputSplit, Integer> pendingSplits() {
+ public Map<HadoopInputSplit, Integer> pendingSplits() {
return pendingSplits;
}
@@ -261,7 +261,7 @@ public class HadoopJobMetadata implements Externalizable {
* @param split Split.
* @return Task number.
*/
- public int taskNumber(GridHadoopInputSplit split) {
+ public int taskNumber(HadoopInputSplit split) {
return pendingSplits.get(split);
}
@@ -287,7 +287,7 @@ public class HadoopJobMetadata implements Externalizable {
jobId = (GridHadoopJobId)in.readObject();
jobInfo = (GridHadoopJobInfo)in.readObject();
mrPlan = (GridHadoopMapReducePlan)in.readObject();
- pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject();
+ pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject();
pendingReducers = (Collection<Integer>)in.readObject();
phase = (GridHadoopJobPhase)in.readObject();
failCause = (Throwable)in.readObject();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index a0ae3f6..99a759d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*;
import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
/**
@@ -66,7 +66,7 @@ public class HadoopJobTracker extends HadoopComponent {
private GridHadoopMapReducePlanner mrPlanner;
/** All the known jobs. */
- private final ConcurrentMap<GridHadoopJobId, GridFutureAdapterEx<GridHadoopJob>> jobs = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<GridHadoopJobId, GridFutureAdapterEx<HadoopJob>> jobs = new ConcurrentHashMap8<>();
/** Locally active jobs. */
private final ConcurrentMap<GridHadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>();
@@ -246,7 +246,7 @@ public class HadoopJobTracker extends HadoopComponent {
if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId))
throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
- GridHadoopJob job = job(jobId, info);
+ HadoopJob job = job(jobId, info);
GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
@@ -511,13 +511,13 @@ public class HadoopJobTracker extends HadoopComponent {
* @return Collection of all input splits that should be processed.
*/
@SuppressWarnings("ConstantConditions")
- private Map<GridHadoopInputSplit, Integer> allSplits(GridHadoopMapReducePlan plan) {
- Map<GridHadoopInputSplit, Integer> res = new HashMap<>();
+ private Map<HadoopInputSplit, Integer> allSplits(GridHadoopMapReducePlan plan) {
+ Map<HadoopInputSplit, Integer> res = new HashMap<>();
int taskNum = 0;
for (UUID nodeId : plan.mapperNodeIds()) {
- for (GridHadoopInputSplit split : plan.mappers(nodeId)) {
+ for (HadoopInputSplit split : plan.mappers(nodeId)) {
if (res.put(split, taskNum++) != null)
throw new IllegalStateException("Split duplicate.");
}
@@ -568,7 +568,7 @@ public class HadoopJobTracker extends HadoopComponent {
try {
if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) {
// Failover setup task.
- GridHadoopJob job = job(jobId, meta.jobInfo());
+ HadoopJob job = job(jobId, meta.jobInfo());
Collection<GridHadoopTaskInfo> setupTask = setupTask(jobId);
@@ -579,12 +579,12 @@ public class HadoopJobTracker extends HadoopComponent {
else if (phase == PHASE_MAP || phase == PHASE_REDUCE) {
// Must check all nodes, even that are not event node ID due to
// multiple node failure possibility.
- Collection<GridHadoopInputSplit> cancelSplits = null;
+ Collection<HadoopInputSplit> cancelSplits = null;
for (UUID nodeId : plan.mapperNodeIds()) {
if (ctx.kernalContext().discovery().node(nodeId) == null) {
// Node has left the grid.
- Collection<GridHadoopInputSplit> mappers = plan.mappers(nodeId);
+ Collection<HadoopInputSplit> mappers = plan.mappers(nodeId);
if (cancelSplits == null)
cancelSplits = new HashSet<>();
@@ -693,7 +693,7 @@ public class HadoopJobTracker extends HadoopComponent {
throws IgniteCheckedException {
JobLocalState state = activeJobs.get(jobId);
- GridHadoopJob job = job(jobId, meta.jobInfo());
+ HadoopJob job = job(jobId, meta.jobInfo());
GridHadoopMapReducePlan plan = meta.mapReducePlan();
@@ -770,13 +770,13 @@ public class HadoopJobTracker extends HadoopComponent {
}
else {
// Check if there are unscheduled mappers or reducers.
- Collection<GridHadoopInputSplit> cancelMappers = new ArrayList<>();
+ Collection<HadoopInputSplit> cancelMappers = new ArrayList<>();
Collection<Integer> cancelReducers = new ArrayList<>();
- Collection<GridHadoopInputSplit> mappers = plan.mappers(ctx.localNodeId());
+ Collection<HadoopInputSplit> mappers = plan.mappers(ctx.localNodeId());
if (mappers != null) {
- for (GridHadoopInputSplit b : mappers) {
+ for (HadoopInputSplit b : mappers) {
if (state == null || !state.mapperScheduled(b))
cancelMappers.add(b);
}
@@ -836,7 +836,7 @@ public class HadoopJobTracker extends HadoopComponent {
if (statWriterClsName != null) {
Class<?> cls = ldr.loadClass(statWriterClsName);
- GridHadoopCounterWriter writer = (GridHadoopCounterWriter)cls.newInstance();
+ HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance();
GridHadoopCounters cntrs = meta.counters();
@@ -879,7 +879,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param meta Job metadata.
* @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node.
*/
- private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, HadoopJobMetadata meta) {
+ private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) {
UUID locNodeId = ctx.localNodeId();
GridHadoopJobId jobId = meta.jobId();
@@ -891,7 +891,7 @@ public class HadoopJobTracker extends HadoopComponent {
if (state == null)
state = initState(jobId);
- for (GridHadoopInputSplit split : mappers) {
+ for (HadoopInputSplit split : mappers) {
if (state.addMapper(split)) {
if (log.isDebugEnabled())
log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId +
@@ -917,7 +917,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param job Job instance.
* @return Collection of task infos.
*/
- private Collection<GridHadoopTaskInfo> reducerTasks(int[] reducers, GridHadoopJob job) {
+ private Collection<GridHadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) {
UUID locNodeId = ctx.localNodeId();
GridHadoopJobId jobId = job.id();
@@ -966,15 +966,15 @@ public class HadoopJobTracker extends HadoopComponent {
* @return Job.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public GridHadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException {
- GridFutureAdapterEx<GridHadoopJob> fut = jobs.get(jobId);
+ @Nullable public HadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException {
+ GridFutureAdapterEx<HadoopJob> fut = jobs.get(jobId);
- if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<GridHadoopJob>())) != null)
+ if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<HadoopJob>())) != null)
return fut.get();
fut = jobs.get(jobId);
- GridHadoopJob job = null;
+ HadoopJob job = null;
try {
if (jobInfo == null) {
@@ -1103,7 +1103,7 @@ public class HadoopJobTracker extends HadoopComponent {
*/
private class JobLocalState {
/** Mappers. */
- private final Collection<GridHadoopInputSplit> currMappers = new HashSet<>();
+ private final Collection<HadoopInputSplit> currMappers = new HashSet<>();
/** Reducers. */
private final Collection<Integer> currReducers = new HashSet<>();
@@ -1121,7 +1121,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param mapSplit Map split to add.
* @return {@code True} if mapper was added.
*/
- private boolean addMapper(GridHadoopInputSplit mapSplit) {
+ private boolean addMapper(HadoopInputSplit mapSplit) {
return currMappers.add(mapSplit);
}
@@ -1139,7 +1139,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param mapSplit Map split to check.
* @return {@code True} if mapper was scheduled.
*/
- public boolean mapperScheduled(GridHadoopInputSplit mapSplit) {
+ public boolean mapperScheduled(HadoopInputSplit mapSplit) {
return currMappers.contains(mapSplit);
}
@@ -1315,7 +1315,7 @@ public class HadoopJobTracker extends HadoopComponent {
private static final long serialVersionUID = 0L;
/** Mapper split to remove. */
- private final Collection<GridHadoopInputSplit> splits;
+ private final Collection<HadoopInputSplit> splits;
/** Error. */
private final Throwable err;
@@ -1325,7 +1325,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param split Mapper split to remove.
* @param err Error.
*/
- private RemoveMappersProcessor(@Nullable StackedProcessor prev, GridHadoopInputSplit split, Throwable err) {
+ private RemoveMappersProcessor(@Nullable StackedProcessor prev, HadoopInputSplit split, Throwable err) {
this(prev, Collections.singletonList(split), err);
}
@@ -1334,7 +1334,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param splits Mapper splits to remove.
* @param err Error.
*/
- private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<GridHadoopInputSplit> splits,
+ private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<HadoopInputSplit> splits,
Throwable err) {
super(prev);
@@ -1344,9 +1344,9 @@ public class HadoopJobTracker extends HadoopComponent {
/** {@inheritDoc} */
@Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
- Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
+ Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
- for (GridHadoopInputSplit s : splits)
+ for (HadoopInputSplit s : splits)
splitsCp.remove(s);
cp.pendingSplits(splitsCp);
@@ -1466,7 +1466,7 @@ public class HadoopJobTracker extends HadoopComponent {
private static final long serialVersionUID = 0L;
/** Mapper split to remove. */
- private final Collection<GridHadoopInputSplit> splits;
+ private final Collection<HadoopInputSplit> splits;
/** Reducers to remove. */
private final Collection<Integer> rdc;
@@ -1488,7 +1488,7 @@ public class HadoopJobTracker extends HadoopComponent {
* @param rdc Reducers to remove.
*/
private CancelJobProcessor(@Nullable StackedProcessor prev,
- Collection<GridHadoopInputSplit> splits,
+ Collection<HadoopInputSplit> splits,
Collection<Integer> rdc) {
this(prev, null, splits, rdc);
}
@@ -1501,7 +1501,7 @@ public class HadoopJobTracker extends HadoopComponent {
*/
private CancelJobProcessor(@Nullable StackedProcessor prev,
Throwable err,
- Collection<GridHadoopInputSplit> splits,
+ Collection<HadoopInputSplit> splits,
Collection<Integer> rdc) {
super(prev);
@@ -1521,10 +1521,10 @@ public class HadoopJobTracker extends HadoopComponent {
cp.pendingReducers(rdcCp);
- Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
+ Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
if (splits != null) {
- for (GridHadoopInputSplit s : splits)
+ for (HadoopInputSplit s : splits)
splitsCp.remove(s);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
index 9ec2b5b..f24e8f2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java
@@ -30,7 +30,7 @@ public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan {
private static final long serialVersionUID = 0L;
/** Mappers map. */
- private Map<UUID, Collection<GridHadoopInputSplit>> mappers;
+ private Map<UUID, Collection<HadoopInputSplit>> mappers;
/** Reducers map. */
private Map<UUID, int[]> reducers;
@@ -45,13 +45,13 @@ public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan {
* @param mappers Mappers map.
* @param reducers Reducers map.
*/
- public HadoopDefaultMapReducePlan(Map<UUID, Collection<GridHadoopInputSplit>> mappers,
+ public HadoopDefaultMapReducePlan(Map<UUID, Collection<HadoopInputSplit>> mappers,
Map<UUID, int[]> reducers) {
this.mappers = mappers;
this.reducers = reducers;
if (mappers != null) {
- for (Collection<GridHadoopInputSplit> splits : mappers.values())
+ for (Collection<HadoopInputSplit> splits : mappers.values())
mappersCnt += splits.size();
}
@@ -86,7 +86,7 @@ public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan {
}
/** {@inheritDoc} */
- @Override @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId) {
+ @Override @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId) {
return mappers.get(nodeId);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java
index a625b3d..6e6e874 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java
@@ -47,7 +47,7 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner
private IgniteLogger log;
/** {@inheritDoc} */
- @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top,
+ @Override public GridHadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
@Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException {
// Convert collection of topology nodes to collection of topology node IDs.
Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f);
@@ -55,7 +55,7 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner
for (ClusterNode topNode : top)
topIds.add(topNode.id());
- Map<UUID, Collection<GridHadoopInputSplit>> mappers = mappers(top, topIds, job.input());
+ Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, job.input());
int rdcCnt = job.info().reducers();
@@ -76,9 +76,9 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner
* @return Mappers map.
* @throws IgniteCheckedException If failed.
*/
- private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds,
- Iterable<GridHadoopInputSplit> splits) throws IgniteCheckedException {
- Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>();
+ private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds,
+ Iterable<HadoopInputSplit> splits) throws IgniteCheckedException {
+ Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
Map<String, Collection<UUID>> nodes = hosts(top);
@@ -87,13 +87,13 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner
for (UUID nodeId : topIds)
nodeLoads.put(nodeId, 0);
- for (GridHadoopInputSplit split : splits) {
+ for (HadoopInputSplit split : splits) {
UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads);
if (log.isDebugEnabled())
log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']');
- Collection<GridHadoopInputSplit> nodeSplits = mappers.get(nodeId);
+ Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId);
if (nodeSplits == null) {
nodeSplits = new ArrayList<>();
@@ -147,10 +147,10 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner
* @return Node ID.
*/
@SuppressWarnings("unchecked")
- private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes,
+ private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes,
Map<UUID, Integer> nodeLoads) throws IgniteCheckedException {
- if (split instanceof GridHadoopFileBlock) {
- GridHadoopFileBlock split0 = (GridHadoopFileBlock)split;
+ if (split instanceof HadoopFileBlock) {
+ HadoopFileBlock split0 = (HadoopFileBlock)split;
if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority());
@@ -293,14 +293,14 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner
* @return Reducers map.
*/
private Map<UUID, int[]> reducers(Collection<ClusterNode> top,
- Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) {
+ Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) {
// Determine initial node weights.
int totalWeight = 0;
List<WeightedNode> nodes = new ArrayList<>(top.size());
for (ClusterNode node : top) {
- Collection<GridHadoopInputSplit> split = mappers.get(node.id());
+ Collection<HadoopInputSplit> split = mappers.get(node.id());
int weight = reducerNodeWeight(node, split != null ? split.size() : 0);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
index 56da194..6625d7d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
@@ -31,7 +31,7 @@ public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<Gri
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override public GridHadoopCounters run(ComputeJobContext jobCtx, GridHadoop hadoop,
+ @Override public GridHadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop,
HadoopProtocolTaskArguments args) throws IgniteCheckedException {
UUID nodeId = UUID.fromString(args.<String>get(0));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
index ac70c44..0714eb1 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
@@ -40,7 +40,7 @@ public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<GridH
private static final String ATTR_HELD = "held";
/** {@inheritDoc} */
- @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, GridHadoop hadoop,
+ @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop,
HadoopProtocolTaskArguments args) throws IgniteCheckedException {
UUID nodeId = UUID.fromString(args.<String>get(0));
Integer id = args.get(1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
index 8522ab0..fc0e484 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
@@ -31,7 +31,7 @@ public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop,
+ @Override public Boolean run(ComputeJobContext jobCtx, Hadoop hadoop,
HadoopProtocolTaskArguments args) throws IgniteCheckedException {
UUID nodeId = UUID.fromString(args.<String>get(0));
Integer id = args.get(1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
index 357e12d..e30feb7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
@@ -28,7 +28,7 @@ public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<Grid
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop,
+ @Override public GridHadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop,
HadoopProtocolTaskArguments args) {
return hadoop.nextJobId();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
index df03c79..1da4b58 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
@@ -33,7 +33,7 @@ public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<GridH
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop,
+ @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop,
HadoopProtocolTaskArguments args) throws IgniteCheckedException {
UUID nodeId = UUID.fromString(args.<String>get(0));
Integer id = args.get(1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
index 6938d1c..f763ccc 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
@@ -108,6 +108,6 @@ public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<Hadoop
* @return Job result.
* @throws IgniteCheckedException If failed.
*/
- public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, HadoopProtocolTaskArguments args)
+ public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args)
throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 267316e..f3c7837 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -199,7 +199,7 @@ public class HadoopShuffle extends HadoopComponent {
* @param taskCtx Task info.
* @return Output.
*/
- public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
return job(taskCtx.taskInfo().jobId()).output(taskCtx);
}
@@ -207,7 +207,7 @@ public class HadoopShuffle extends HadoopComponent {
* @param taskCtx Task info.
* @return Input.
*/
- public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
return job(taskCtx.taskInfo().jobId()).input(taskCtx);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index a75b34b..3dab6eb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -47,7 +47,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
private static final int MSG_BUF_SIZE = 128 * 1024;
/** */
- private final GridHadoopJob job;
+ private final HadoopJob job;
/** */
private final GridUnsafeMemory mem;
@@ -56,7 +56,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
private final boolean needPartitioner;
/** Collection of task contexts for each reduce task. */
- private final Map<Integer, GridHadoopTaskContext> reducersCtx = new HashMap<>();
+ private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>();
/** Reducers addresses. */
private T[] reduceAddrs;
@@ -98,7 +98,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @param locReducers Reducers will work on current node.
* @throws IgniteCheckedException If error.
*/
- public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem,
+ public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
this.locReduceAddr = locReduceAddr;
this.job = job;
@@ -107,7 +107,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
if (!F.isEmpty(locReducers)) {
for (int rdc : locReducers) {
- GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, job.id(), rdc, 0, null);
+ GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
reducersCtx.put(rdc, job.getTaskContext(taskInfo));
}
@@ -204,7 +204,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
assert msg.buffer() != null;
assert msg.offset() > 0;
- GridHadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
+ HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
@@ -487,7 +487,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @return Output.
* @throws IgniteCheckedException If failed.
*/
- public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
switch (taskCtx.taskInfo().type()) {
case MAP:
assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined.";
@@ -506,7 +506,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
switch (taskCtx.taskInfo().type()) {
case REDUCE:
int reducer = taskCtx.taskInfo().taskNumber();
@@ -516,7 +516,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
if (m != null)
return m.input(taskCtx);
- return new GridHadoopTaskInput() { // Empty input.
+ return new HadoopTaskInput() { // Empty input.
@Override public boolean next() {
return false;
}
@@ -542,21 +542,21 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/**
* Partitioned output.
*/
- private class PartitionedOutput implements GridHadoopTaskOutput {
+ private class PartitionedOutput implements HadoopTaskOutput {
/** */
- private final GridHadoopTaskOutput[] adders = new GridHadoopTaskOutput[maps.length()];
+ private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()];
/** */
- private GridHadoopPartitioner partitioner;
+ private HadoopPartitioner partitioner;
/** */
- private final GridHadoopTaskContext taskCtx;
+ private final HadoopTaskContext taskCtx;
/**
* Constructor.
* @param taskCtx Task context.
*/
- private PartitionedOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ private PartitionedOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
this.taskCtx = taskCtx;
if (needPartitioner)
@@ -574,7 +574,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
throw new IgniteCheckedException("Invalid partition: " + part);
}
- GridHadoopTaskOutput out = adders[part];
+ HadoopTaskOutput out = adders[part];
if (out == null)
adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
@@ -584,7 +584,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
- for (GridHadoopTaskOutput adder : adders) {
+ for (HadoopTaskOutput adder : adders) {
if (adder != null)
adder.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
index 46d8bc9..82da910 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
@@ -87,7 +87,7 @@ public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase {
* @return Adder object.
* @param ctx Task context.
*/
- @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
if (inputs.get() != 0)
throw new IllegalStateException("Active inputs.");
@@ -162,7 +162,7 @@ public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase {
}
/** {@inheritDoc} */
- @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
inputs.incrementAndGet();
if (!adders.isEmpty())
@@ -369,7 +369,7 @@ public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase {
* @param ctx Task context.
* @throws IgniteCheckedException If failed.
*/
- private AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ private AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
super(ctx);
keyReader = new Reader(keySer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
index 15b93c6..fcf8e17 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
@@ -46,7 +46,7 @@ public class HadoopHashMultimap extends HadoopHashMultimapBase {
}
/** {@inheritDoc} */
- @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
return new AdderImpl(ctx);
}
@@ -103,7 +103,7 @@ public class HadoopHashMultimap extends HadoopHashMultimapBase {
* @param ctx Task context.
* @throws IgniteCheckedException If failed.
*/
- protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
super(ctx);
keyReader = new Reader(keySer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
index f62a354..c464fd1 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
@@ -41,7 +41,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
}
/** {@inheritDoc} */
- @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
return new Input(taskCtx);
}
@@ -120,7 +120,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
/**
* @param ser Serialization.
*/
- protected Reader(GridHadoopSerialization ser) {
+ protected Reader(HadoopSerialization ser) {
super(ser);
}
@@ -143,7 +143,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
/**
* Task input.
*/
- protected class Input implements GridHadoopTaskInput {
+ protected class Input implements HadoopTaskInput {
/** */
private int idx = -1;
@@ -163,7 +163,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
* @param taskCtx Task context.
* @throws IgniteCheckedException If failed.
*/
- public Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ public Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
cap = capacity();
keyReader = new Reader(taskCtx.keySerialization());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
index e1fa1f1..5def6d3 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
@@ -42,14 +42,14 @@ public interface HadoopMultimap extends AutoCloseable {
* @return Adder.
* @throws IgniteCheckedException If failed.
*/
- public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException;
+ public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException;
/**
* @param taskCtx Task context.
* @return Task input.
* @throws IgniteCheckedException If failed.
*/
- public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx)
+ public HadoopTaskInput input(HadoopTaskContext taskCtx)
throws IgniteCheckedException;
/** {@inheritDoc} */
@@ -58,7 +58,7 @@ public interface HadoopMultimap extends AutoCloseable {
/**
* Adder.
*/
- public interface Adder extends GridHadoopTaskOutput {
+ public interface Adder extends HadoopTaskOutput {
/**
* @param in Data input.
* @param reuse Reusable key.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
index 4aa6e9e..5afcbc9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
@@ -110,7 +110,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
private Object tmp;
/** */
- private final GridHadoopSerialization ser;
+ private final HadoopSerialization ser;
/** */
private final HadoopDataInStream in = new HadoopDataInStream(mem);
@@ -118,7 +118,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
/**
* @param ser Serialization.
*/
- protected ReaderBase(GridHadoopSerialization ser) {
+ protected ReaderBase(HadoopSerialization ser) {
assert ser != null;
this.ser = ser;
@@ -172,10 +172,10 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
*/
protected abstract class AdderBase implements Adder {
/** */
- protected final GridHadoopSerialization keySer;
+ protected final HadoopSerialization keySer;
/** */
- protected final GridHadoopSerialization valSer;
+ protected final HadoopSerialization valSer;
/** */
private final HadoopDataOutStream out;
@@ -190,7 +190,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
* @param ctx Task context.
* @throws IgniteCheckedException If failed.
*/
- protected AdderBase(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException {
valSer = ctx.valueSerialization();
keySer = ctx.keySerialization();
@@ -259,7 +259,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap {
* @return Page pointer.
* @throws IgniteCheckedException If failed.
*/
- protected long write(int off, Object o, GridHadoopSerialization ser) throws IgniteCheckedException {
+ protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException {
writeStart = fixAlignment();
if (off != 0)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
index 07bae6b..c7bcda9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
@@ -93,12 +93,12 @@ public class HadoopSkipList extends HadoopMultimapBase {
}
/** {@inheritDoc} */
- @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
return new AdderImpl(ctx);
}
/** {@inheritDoc} */
- @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
Input in = new Input(taskCtx);
Comparator<Object> grpCmp = taskCtx.groupComparator();
@@ -243,7 +243,7 @@ public class HadoopSkipList extends HadoopMultimapBase {
/**
* @param ser Serialization.
*/
- protected Reader(GridHadoopSerialization ser) {
+ protected Reader(HadoopSerialization ser) {
super(ser);
}
@@ -285,7 +285,7 @@ public class HadoopSkipList extends HadoopMultimapBase {
* @param ctx Task context.
* @throws IgniteCheckedException If failed.
*/
- protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
super(ctx);
keyReader = new Reader(keySer);
@@ -570,7 +570,7 @@ public class HadoopSkipList extends HadoopMultimapBase {
/**
* Task input.
*/
- private class Input implements GridHadoopTaskInput {
+ private class Input implements HadoopTaskInput {
/** */
private long metaPtr = heads;
@@ -584,7 +584,7 @@ public class HadoopSkipList extends HadoopMultimapBase {
* @param taskCtx Task context.
* @throws IgniteCheckedException If failed.
*/
- private Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
keyReader = new Reader(taskCtx.keySerialization());
valReader = new Reader(taskCtx.valueSerialization());
}
@@ -616,7 +616,7 @@ public class HadoopSkipList extends HadoopMultimapBase {
/**
* Grouped input using grouping comparator.
*/
- private class GroupedInput implements GridHadoopTaskInput {
+ private class GroupedInput implements HadoopTaskInput {
/** */
private final Comparator<Object> grpCmp;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
index e217c57..9858e12 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
@@ -69,7 +69,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
}
/** {@inheritDoc} */
- @Override public void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
+ @Override public void run(final HadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
", tasksCnt=" + tasks.size() + ']');
@@ -101,11 +101,11 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
jobTracker.onTaskFinished(info, status);
}
- @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
return ctx.shuffle().input(taskCtx);
}
- @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+ @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
return ctx.shuffle().output(taskCtx);
}
};
@@ -121,8 +121,8 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
* for this job ID.
* <p>
* It is guaranteed that this method will not be called concurrently with
- * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
- * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
+ * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
+ * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
*
* @param jobId Job ID to cancel.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 5b10d6f..4776321 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -28,7 +28,7 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*;
/**
* Runnable task.
@@ -41,7 +41,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
private final IgniteLogger log;
/** */
- private final GridHadoopJob job;
+ private final HadoopJob job;
/** Task to run. */
private final GridHadoopTaskInfo info;
@@ -59,7 +59,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
private HadoopMultimap combinerInput;
/** */
- private volatile GridHadoopTaskContext ctx;
+ private volatile HadoopTaskContext ctx;
/** Set if task is to cancelling. */
private volatile boolean cancelled;
@@ -74,7 +74,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
* @param info Task info.
* @param nodeId Node id.
*/
- protected HadoopRunnableTask(IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info,
+ protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info,
UUID nodeId) {
this.nodeId = nodeId;
this.log = log.getLogger(HadoopRunnableTask.class);
@@ -165,8 +165,8 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
if (cancelled)
throw new HadoopTaskCancelledException("Task cancelled.");
- try (GridHadoopTaskOutput out = createOutputInternal(ctx);
- GridHadoopTaskInput in = createInputInternal(ctx)) {
+ try (HadoopTaskOutput out = createOutputInternal(ctx);
+ HadoopTaskInput in = createInputInternal(ctx)) {
ctx.input(in);
ctx.output(out);
@@ -198,7 +198,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- private GridHadoopTaskInput createInputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
switch (ctx.taskInfo().type()) {
case SETUP:
case MAP:
@@ -221,21 +221,21 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
* @return Input.
* @throws IgniteCheckedException If failed.
*/
- protected abstract GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) throws IgniteCheckedException;
+ protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException;
/**
* @param ctx Task info.
* @return Output.
* @throws IgniteCheckedException If failed.
*/
- protected abstract GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) throws IgniteCheckedException;
+ protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException;
/**
* @param ctx Task info.
* @return Task output.
* @throws IgniteCheckedException If failed.
*/
- private GridHadoopTaskOutput createOutputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+ private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
switch (ctx.taskInfo().type()) {
case SETUP:
case REDUCE:
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
index 2da2373..c2002e6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
@@ -34,15 +34,15 @@ public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
* @param tasks Tasks.
* @throws IgniteCheckedException If failed.
*/
- public abstract void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException;
+ public abstract void run(final HadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException;
/**
* Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
* for this job ID.
* <p>
* It is guaranteed that this method will not be called concurrently with
- * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
- * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
+ * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
+ * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
*
* @param jobId Job ID to cancel.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
index f05761e..db95b2f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -174,7 +174,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
}
}
else if (ctx.isParticipating(meta)) {
- GridHadoopJob job;
+ HadoopJob job;
try {
job = jobTracker.job(meta.jobId(), meta.jobInfo());
@@ -191,7 +191,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
- @Override public void run(final GridHadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
+ @Override public void run(final HadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
if (!busyLock.tryReadLock()) {
if (log.isDebugEnabled())
log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
@@ -202,10 +202,10 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
try {
HadoopProcess proc = runningProcsByJobId.get(job.id());
- GridHadoopTaskType taskType = F.first(tasks).type();
+ HadoopTaskType taskType = F.first(tasks).type();
- if (taskType == GridHadoopTaskType.SETUP || taskType == GridHadoopTaskType.ABORT ||
- taskType == GridHadoopTaskType.COMMIT) {
+ if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT ||
+ taskType == HadoopTaskType.COMMIT) {
if (proc == null || proc.terminated()) {
runningProcsByJobId.remove(job.id(), proc);
@@ -269,7 +269,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @param job Job instance.
* @param tasks Collection of tasks to execute in started process.
*/
- private void sendExecutionRequest(HadoopProcess proc, GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks)
+ private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<GridHadoopTaskInfo> tasks)
throws IgniteCheckedException {
// Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
proc.lock();
@@ -325,7 +325,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @param job Job instance.
* @param plan Map reduce plan.
*/
- private HadoopProcess startProcess(final GridHadoopJob job, final GridHadoopMapReducePlan plan) {
+ private HadoopProcess startProcess(final HadoopJob job, final GridHadoopMapReducePlan plan) {
final UUID childProcId = UUID.randomUUID();
GridHadoopJobId jobId = job.id();
@@ -494,7 +494,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @return Started process.
*/
private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta,
- GridHadoopJob job) throws Exception {
+ HadoopJob job) throws Exception {
String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
if (log.isDebugEnabled())
@@ -604,7 +604,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @param job Job.
* @param plan Map reduce plan.
*/
- private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) {
+ private void prepareForJob(HadoopProcess proc, HadoopJob job, GridHadoopMapReducePlan plan) {
try {
comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(),
plan.reducers(), plan.reducers(ctx.localNodeId())));