You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 15:14:30 UTC
[12/13] incubator-ignite git commit: # IGNITE-386: Moving core
classes (7).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
new file mode 100644
index 0000000..3da2fb1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Map-reduce job execution plan.
+ */
+public interface HadoopMapReducePlan extends Serializable {
+ /**
+ * Gets collection of file blocks for which mappers should be executed.
+ *
+ * @param nodeId Node ID to check.
+ * @return Collection of file blocks or {@code null} if no mappers should be executed on given node.
+ */
+ @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId);
+
+ /**
+ * Gets reducer IDs that should be started on given node.
+ *
+ * @param nodeId Node ID to check.
+ * @return Array of reducer IDs.
+ */
+ @Nullable public int[] reducers(UUID nodeId);
+
+ /**
+ * Gets collection of all node IDs involved in map part of job execution.
+ *
+ * @return Collection of node IDs.
+ */
+ public Collection<UUID> mapperNodeIds();
+
+ /**
+ * Gets collection of all node IDs involved in reduce part of job execution.
+ *
+ * @return Collection of node IDs.
+ */
+ public Collection<UUID> reducerNodeIds();
+
+ /**
+ * Gets overall number of mappers for the job.
+ *
+ * @return Number of mappers.
+ */
+ public int mappers();
+
+ /**
+ * Gets overall number of reducers for the job.
+ *
+ * @return Number of reducers.
+ */
+ public int reducers();
+
+ /**
+ * Gets node ID for reducer.
+ *
+ * @param reducer Reducer.
+ * @return Node ID.
+ */
+ public UUID nodeForReducer(int reducer);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
new file mode 100644
index 0000000..ab885fe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Map-reduce execution planner.
+ */
+public interface HadoopMapReducePlanner {
+ /**
+ * Prepares map-reduce execution plan for the given job and topology.
+ *
+ * @param job Job.
+ * @param top Topology.
+ * @param oldPlan Old plan in case of partial failure.
+ * @return Map reduce plan.
+ */
+ public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
+ @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java
new file mode 100644
index 0000000..eb84d00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.util.future.*;
+
+/**
+ * Hadoop processor.
+ */
+public class HadoopNoopProcessor extends HadoopProcessorAdapter {
+ /**
+ * @param ctx Kernal context.
+ */
+ public HadoopNoopProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Hadoop hadoop() {
+ throw new IllegalStateException("Hadoop module is not found in class path.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration config() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobId nextJobId() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
+ return new GridFinishedFutureEx<>(new IgniteCheckedException("Hadoop is not available."));
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopCounters counters(HadoopJobId jobId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java
new file mode 100644
index 0000000..44ff8be
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+
+/**
+ * Hadoop processor.
+ */
+public abstract class HadoopProcessorAdapter extends GridProcessorAdapter {
+ /**
+ * @param ctx Kernal context.
+ */
+ protected HadoopProcessorAdapter(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * @return Hadoop facade.
+ */
+ public abstract Hadoop hadoop();
+
+ /**
+ * @return Hadoop configuration.
+ */
+ public abstract HadoopConfiguration config();
+
+ /**
+ * @return Collection of generated IDs.
+ */
+ public abstract HadoopJobId nextJobId();
+
+ /**
+ * Submits job to job tracker.
+ *
+ * @param jobId Job ID to submit.
+ * @param jobInfo Job info to submit.
+ * @return Execution future.
+ */
+ public abstract IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo);
+
+ /**
+ * Gets Hadoop job execution status.
+ *
+ * @param jobId Job ID to get status for.
+ * @return Job execution status.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException;
+
+ /**
+ * Returns Hadoop job counters.
+ *
+ * @param jobId Job ID to get counters for.
+ * @return Job counters.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException;
+
+ /**
+ * Gets Hadoop job finish future.
+ *
+ * @param jobId Job ID.
+ * @return Job finish future or {@code null}.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException;
+
+ /**
+ * Kills job.
+ *
+ * @param jobId Job ID.
+ * @return {@code True} if job was killed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract boolean kill(HadoopJobId jobId) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java
index c6a409f..3ce83ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java
@@ -26,14 +26,14 @@ import java.io.*;
*/
public abstract class HadoopTask {
/** */
- private GridHadoopTaskInfo taskInfo;
+ private HadoopTaskInfo taskInfo;
/**
* Creates task.
*
* @param taskInfo Task info.
*/
- protected HadoopTask(GridHadoopTaskInfo taskInfo) {
+ protected HadoopTask(HadoopTaskInfo taskInfo) {
assert taskInfo != null;
this.taskInfo = taskInfo;
@@ -52,7 +52,7 @@ public abstract class HadoopTask {
*
* @return Task info.
*/
- public GridHadoopTaskInfo info() {
+ public HadoopTaskInfo info() {
return taskInfo;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 4b66a92..371fd81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.hadoop;
import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
import java.util.*;
@@ -35,13 +36,13 @@ public abstract class HadoopTaskContext {
private HadoopTaskOutput output;
/** */
- private GridHadoopTaskInfo taskInfo;
+ private HadoopTaskInfo taskInfo;
/**
* @param taskInfo Task info.
* @param job Job.
*/
- protected HadoopTaskContext(GridHadoopTaskInfo taskInfo, HadoopJob job) {
+ protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJob job) {
this.taskInfo = taskInfo;
this.job = job;
}
@@ -51,7 +52,7 @@ public abstract class HadoopTaskContext {
*
* @return Task info.
*/
- public GridHadoopTaskInfo taskInfo() {
+ public HadoopTaskInfo taskInfo() {
return taskInfo;
}
@@ -60,7 +61,7 @@ public abstract class HadoopTaskContext {
*
* @param info Task info.
*/
- public void taskInfo(GridHadoopTaskInfo info) {
+ public void taskInfo(HadoopTaskInfo info) {
taskInfo = info;
}
@@ -103,7 +104,7 @@ public abstract class HadoopTaskContext {
*
* @return Unmodifiable collection of counters.
*/
- public abstract GridHadoopCounters counters();
+ public abstract HadoopCounters counters();
/**
* Sets input of the task.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
new file mode 100644
index 0000000..eb82cb4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Task info.
+ */
+public class HadoopTaskInfo implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private HadoopTaskType type;
+
+ /** */
+ private HadoopJobId jobId;
+
+ /** */
+ private int taskNum;
+
+ /** */
+ private int attempt;
+
+ /** */
+ private HadoopInputSplit inputSplit;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public HadoopTaskInfo() {
+ // No-op.
+ }
+
+ /**
+ * Creates new task info.
+ *
+ * @param type Task type.
+ * @param jobId Job id.
+ * @param taskNum Task number.
+ * @param attempt Attempt for this task.
+ * @param inputSplit Input split.
+ */
+ public HadoopTaskInfo(HadoopTaskType type, HadoopJobId jobId, int taskNum, int attempt,
+ @Nullable HadoopInputSplit inputSplit) {
+ this.type = type;
+ this.jobId = jobId;
+ this.taskNum = taskNum;
+ this.attempt = attempt;
+ this.inputSplit = inputSplit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeByte(type.ordinal());
+ out.writeObject(jobId);
+ out.writeInt(taskNum);
+ out.writeInt(attempt);
+ out.writeObject(inputSplit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ type = HadoopTaskType.fromOrdinal(in.readByte());
+ jobId = (HadoopJobId)in.readObject();
+ taskNum = in.readInt();
+ attempt = in.readInt();
+ inputSplit = (HadoopInputSplit)in.readObject();
+ }
+
+ /**
+ * @return Type.
+ */
+ public HadoopTaskType type() {
+ return type;
+ }
+
+ /**
+ * @return Job id.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Task number.
+ */
+ public int taskNumber() {
+ return taskNum;
+ }
+
+ /**
+ * @return Attempt.
+ */
+ public int attempt() {
+ return attempt;
+ }
+
+ /**
+ * @return Input split.
+ */
+ @Nullable public HadoopInputSplit inputSplit() {
+ return inputSplit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof HadoopTaskInfo))
+ return false;
+
+ HadoopTaskInfo that = (HadoopTaskInfo)o;
+
+ return attempt == that.attempt && taskNum == that.taskNum && jobId.equals(that.jobId) && type == that.type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = type.hashCode();
+
+ res = 31 * res + jobId.hashCode();
+ res = 31 * res + taskNum;
+ res = 31 * res + attempt;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopTaskInfo.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
deleted file mode 100644
index caa9194..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.future.*;
-
-/**
- * Hadoop processor.
- */
-public class IgniteHadoopNoopProcessor extends IgniteHadoopProcessorAdapter {
- /**
- * @param ctx Kernal context.
- */
- public IgniteHadoopNoopProcessor(GridKernalContext ctx) {
- super(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public Hadoop hadoop() {
- throw new IllegalStateException("Hadoop module is not found in class path.");
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopConfiguration config() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobId nextJobId() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
- return new GridFinishedFutureEx<>(new IgniteCheckedException("Hadoop is not available."));
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopCounters counters(GridHadoopJobId jobId) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
deleted file mode 100644
index d40d5e4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.*;
-
-/**
- * Hadoop processor.
- */
-public abstract class IgniteHadoopProcessorAdapter extends GridProcessorAdapter {
- /**
- * @param ctx Kernal context.
- */
- protected IgniteHadoopProcessorAdapter(GridKernalContext ctx) {
- super(ctx);
- }
-
- /**
- * @return Hadoop facade.
- */
- public abstract Hadoop hadoop();
-
- /**
- * @return Hadoop configuration.
- */
- public abstract GridHadoopConfiguration config();
-
- /**
- * @return Collection of generated IDs.
- */
- public abstract GridHadoopJobId nextJobId();
-
- /**
- * Submits job to job tracker.
- *
- * @param jobId Job ID to submit.
- * @param jobInfo Job info to submit.
- * @return Execution future.
- */
- public abstract IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo);
-
- /**
- * Gets Hadoop job execution status.
- *
- * @param jobId Job ID to get status for.
- * @return Job execution status.
- * @throws IgniteCheckedException If failed.
- */
- public abstract GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException;
-
- /**
- * Returns Hadoop job counters.
- *
- * @param jobId Job ID to get counters for.
- * @return Job counters.
- * @throws IgniteCheckedException If failed.
- */
- public abstract GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException;
-
- /**
- * Gets Hadoop job finish future.
- *
- * @param jobId Job ID.
- * @return Job finish future or {@code null}.
- * @throws IgniteCheckedException If failed.
- */
- public abstract IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException;
-
- /**
- * Kills job.
- *
- * @param jobId Job ID.
- * @return {@code True} if job was killed.
- * @throws IgniteCheckedException If failed.
- */
- public abstract boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounter.java
new file mode 100644
index 0000000..918c3bc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+/**
+ * Hadoop counter.
+ */
+public interface HadoopCounter {
+ /**
+ * Gets name.
+ *
+ * @return Name of the counter.
+ */
+ public String name();
+
+ /**
+ * Gets counter group.
+ *
+ * @return Counter group's name.
+ */
+ public String group();
+
+ /**
+ * Merge the given counter to this counter.
+ *
+ * @param cntr Counter to merge into this counter.
+ */
+ public void merge(HadoopCounter cntr);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
new file mode 100644
index 0000000..ce67c57
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+/**
+ * The object that writes some system counters to some storage for each running job. This operation is a part of
+ * whole statistics collection process.
+ */
+public interface HadoopCounterWriter {
+ /**
+ * Writes counters of given job to some statistics storage.
+ *
+ * @param jobInfo Job info.
+ * @param jobId Job id.
+ * @param cntrs Counters.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) throws IgniteCheckedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounters.java
new file mode 100644
index 0000000..706ba77
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounters.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import java.util.*;
+
+/**
+ * Counters store.
+ */
+public interface HadoopCounters {
+ /**
+ * Returns counter for the specified group and counter name. Creates new if it does not exist.
+ *
+ * @param grp Counter group name.
+ * @param name Counter name.
+ * @param cls Class for new instance creation if it's needed.
+ * @return The counter that was found or added or {@code null} if create is false.
+ */
+ <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls);
+
+ /**
+ * Returns all existing counters.
+ *
+ * @return Collection of counters.
+ */
+ Collection<HadoopCounter> all();
+
+ /**
+ * Merges all counters from another store with existing counters.
+ *
+ * @param other Counters to merge with.
+ */
+ void merge(HadoopCounters other);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 01e554c..66e9761 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.*;
import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.util.typedef.*;
import java.io.*;
@@ -48,7 +49,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
/** {@inheritDoc} */
- @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs)
+ @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs)
throws IgniteCheckedException {
Configuration hadoopCfg = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index 2f484d8..1856e41 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -73,7 +73,7 @@ public class HadoopClassLoader extends URLClassLoader {
* @param cls Class name.
* @return {@code true} if we need to check this class.
*/
- private static boolean isIgfsHadoop(String cls) {
+ private static boolean isHadoopIgfs(String cls) {
String ignitePackagePrefix = "org.apache.ignite";
int len = ignitePackagePrefix.length();
@@ -100,7 +100,7 @@ public class HadoopClassLoader extends URLClassLoader {
return loadClassExplicitly(name, resolve);
}
- if (isIgfsHadoop(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop.
+ if (isHadoopIgfs(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop.
Boolean hasDeps = cache.get(name);
if (hasDeps == null) {
@@ -224,7 +224,7 @@ public class HadoopClassLoader extends URLClassLoader {
if (in == null) // The class is external itself, it must be loaded from this class loader.
return true;
- if (!isIgfsHadoop(clsName)) // Other classes should not have external dependencies.
+ if (!isHadoopIgfs(clsName)) // Other classes should not have external dependencies.
return false;
final ClassReader rdr;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
index d897b6c..68f0baf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.hadoop;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
import org.apache.ignite.internal.processors.hadoop.shuffle.*;
@@ -34,7 +35,7 @@ public class HadoopContext {
private GridKernalContext ctx;
/** Hadoop configuration. */
- private GridHadoopConfiguration cfg;
+ private HadoopConfiguration cfg;
/** Job tracker. */
private HadoopJobTracker jobTracker;
@@ -53,7 +54,7 @@ public class HadoopContext {
*/
public HadoopContext(
GridKernalContext ctx,
- GridHadoopConfiguration cfg,
+ HadoopConfiguration cfg,
HadoopJobTracker jobTracker,
HadoopTaskExecutorAdapter taskExecutor,
HadoopShuffle shuffle
@@ -89,7 +90,7 @@ public class HadoopContext {
*
* @return Hadoop configuration.
*/
- public GridHadoopConfiguration configuration() {
+ public HadoopConfiguration configuration() {
return cfg;
}
@@ -149,7 +150,7 @@ public class HadoopContext {
if (locNodeId.equals(meta.submitNodeId()))
return true;
- GridHadoopMapReducePlan plan = meta.mapReducePlan();
+ HadoopMapReducePlan plan = meta.mapReducePlan();
return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader();
}
@@ -178,7 +179,7 @@ public class HadoopContext {
/**
* @return Map-reduce planner.
*/
- public GridHadoopMapReducePlanner planner() {
+ public HadoopMapReducePlanner planner() {
return cfg.getMapReducePlanner();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java
deleted file mode 100644
index 8655e14..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.counters.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop +counter group adapter.
- */
-class HadoopCounterGroup implements CounterGroup {
- /** Counters. */
- private final HadoopCounters cntrs;
-
- /** Group name. */
- private final String name;
-
- /**
- * Creates new instance.
- *
- * @param cntrs Client counters instance.
- * @param name Group name.
- */
- HadoopCounterGroup(HadoopCounters cntrs, String name) {
- this.cntrs = cntrs;
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override public String getName() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public String getDisplayName() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public void setDisplayName(String displayName) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void addCounter(Counter counter) {
- addCounter(counter.getName(), counter.getDisplayName(), 0);
- }
-
- /** {@inheritDoc} */
- @Override public Counter addCounter(String name, String displayName, long value) {
- final Counter counter = cntrs.findCounter(this.name, name);
-
- counter.setValue(value);
-
- return counter;
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName, String displayName) {
- return cntrs.findCounter(name, counterName);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName, boolean create) {
- return cntrs.findCounter(name, counterName, create);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String counterName) {
- return cntrs.findCounter(name, counterName);
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return cntrs.groupSize(name);
- }
-
- /** {@inheritDoc} */
- @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
- for (final Counter counter : rightGroup)
- cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
- }
-
- /** {@inheritDoc} */
- @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Counter> iterator() {
- return cntrs.iterateGroup(name);
- }
-
- /** {@inheritDoc} */
- @Override public void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
deleted file mode 100644
index 39b9ba6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.counters.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop counters adapter.
- */
-public class HadoopCounters extends Counters {
- /** */
- private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
-
- /**
- * Creates new instance based on given counters.
- *
- * @param cntrs Counters to adapt.
- */
- public HadoopCounters(GridHadoopCounters cntrs) {
- for (HadoopCounter cntr : cntrs.all())
- if (cntr instanceof HadoopLongCounter)
- this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
- return addGroup(grp.getName(), grp.getDisplayName());
- }
-
- /** {@inheritDoc} */
- @Override public CounterGroup addGroup(String name, String displayName) {
- return new HadoopCounterGroup(this, name);
- }
-
- /** {@inheritDoc} */
- @Override public Counter findCounter(String grpName, String cntrName) {
- return findCounter(grpName, cntrName, true);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Counter findCounter(Enum<?> key) {
- return findCounter(key.getDeclaringClass().getName(), key.name(), true);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
- return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
- }
-
- /** {@inheritDoc} */
- @Override public synchronized Iterable<String> getGroupNames() {
- Collection<String> res = new HashSet<>();
-
- for (HadoopCounter counter : cntrs.values())
- res.add(counter.group());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<CounterGroup> iterator() {
- final Iterator<String> iter = getGroupNames().iterator();
-
- return new Iterator<CounterGroup>() {
- @Override public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override public CounterGroup next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- return new HadoopCounterGroup(HadoopCounters.this, iter.next());
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException("not implemented");
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public synchronized CounterGroup getGroup(String grpName) {
- return new HadoopCounterGroup(this, grpName);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized int countCounters() {
- return cntrs.size();
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(DataOutput out) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void readFields(DataInput in) throws IOException {
- throw new UnsupportedOperationException("not implemented");
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
- for (CounterGroup group : other) {
- for (Counter counter : group) {
- findCounter(group.getName(), counter.getName()).increment(counter.getValue());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object genericRight) {
- if (!(genericRight instanceof HadoopCounters))
- return false;
-
- return cntrs.equals(((HadoopCounters) genericRight).cntrs);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return cntrs.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public void setWriteAllCounters(boolean snd) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean getWriteAllCounters() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Limits limits() {
- return null;
- }
-
- /**
- * Returns size of a group.
- *
- * @param grpName Name of the group.
- * @return amount of counters in the given group.
- */
- public int groupSize(String grpName) {
- int res = 0;
-
- for (HadoopCounter counter : cntrs.values()) {
- if (grpName.equals(counter.group()))
- res++;
- }
-
- return res;
- }
-
- /**
- * Returns counters iterator for specified group.
- *
- * @param grpName Name of the group to iterate.
- * @return Counters iterator.
- */
- public Iterator<Counter> iterateGroup(String grpName) {
- Collection<Counter> grpCounters = new ArrayList<>();
-
- for (HadoopLongCounter counter : cntrs.values()) {
- if (grpName.equals(counter.group()))
- grpCounters.add(new HadoopV2Counter(counter));
- }
-
- return grpCounters.iterator();
- }
-
- /**
- * Find a counter in the group.
- *
- * @param grpName The name of the counter group.
- * @param cntrName The name of the counter.
- * @param create Create the counter if not found if true.
- * @return The counter that was found or added or {@code null} if create is false.
- */
- public Counter findCounter(String grpName, String cntrName, boolean create) {
- T2<String, String> key = new T2<>(grpName, cntrName);
-
- HadoopLongCounter internalCntr = cntrs.get(key);
-
- if (internalCntr == null & create) {
- internalCntr = new HadoopLongCounter(grpName,cntrName);
-
- cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
- }
-
- return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index 438874a..77eb6d2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -29,7 +29,7 @@ import java.util.*;
/**
* Hadoop job info based on default Hadoop configuration.
*/
-public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable {
+public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
/** */
private static final long serialVersionUID = 5489900236464999951L;
@@ -82,7 +82,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable {
}
/** {@inheritDoc} */
- @Override public HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
+ @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
try {
Class<?> jobCls0 = jobCls;
@@ -96,7 +96,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable {
}
}
- Constructor<?> constructor = jobCls0.getConstructor(GridHadoopJobId.class, HadoopDefaultJobInfo.class,
+ Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class, HadoopDefaultJobInfo.class,
IgniteLogger.class);
return (HadoopJob)constructor.newInstance(jobId, this, log);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
index b4f2c87..27542a1 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.processors.hadoop;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.util.*;
import org.jetbrains.annotations.*;
@@ -42,12 +44,12 @@ public class HadoopImpl implements Hadoop {
}
/** {@inheritDoc} */
- @Override public GridHadoopConfiguration configuration() {
+ @Override public HadoopConfiguration configuration() {
return proc.config();
}
/** {@inheritDoc} */
- @Override public GridHadoopJobId nextJobId() {
+ @Override public HadoopJobId nextJobId() {
if (busyLock.enterBusy()) {
try {
return proc.nextJobId();
@@ -61,7 +63,7 @@ public class HadoopImpl implements Hadoop {
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
+ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
if (busyLock.enterBusy()) {
try {
return proc.submit(jobId, jobInfo);
@@ -75,7 +77,7 @@ public class HadoopImpl implements Hadoop {
}
/** {@inheritDoc} */
- @Nullable @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
+ @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
return proc.status(jobId);
@@ -89,7 +91,7 @@ public class HadoopImpl implements Hadoop {
}
/** {@inheritDoc} */
- @Nullable @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException {
+ @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
return proc.counters(jobId);
@@ -103,7 +105,7 @@ public class HadoopImpl implements Hadoop {
}
/** {@inheritDoc} */
- @Nullable @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
+ @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
return proc.finishFuture(jobId);
@@ -117,7 +119,7 @@ public class HadoopImpl implements Hadoop {
}
/** {@inheritDoc} */
- @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
+ @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
return proc.kill(jobId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
new file mode 100644
index 0000000..b0c2d3e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.counters.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop +counter group adapter.
+ */
+class HadoopMapReduceCounterGroup implements CounterGroup {
+ /** Counters. */
+ private final HadoopMapReduceCounters cntrs;
+
+ /** Group name. */
+ private final String name;
+
+ /**
+ * Creates new instance.
+ *
+ * @param cntrs Client counters instance.
+ * @param name Group name.
+ */
+ HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) {
+ this.cntrs = cntrs;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getDisplayName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setDisplayName(String displayName) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addCounter(Counter counter) {
+ addCounter(counter.getName(), counter.getDisplayName(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter addCounter(String name, String displayName, long value) {
+ final Counter counter = cntrs.findCounter(this.name, name);
+
+ counter.setValue(value);
+
+ return counter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, String displayName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName, boolean create) {
+ return cntrs.findCounter(name, counterName, create);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String counterName) {
+ return cntrs.findCounter(name, counterName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return cntrs.groupSize(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
+ for (final Counter counter : rightGroup)
+ cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroupBase<Counter> getUnderlyingGroup() {
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Counter> iterator() {
+ return cntrs.iterateGroup(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
new file mode 100644
index 0000000..c2c9e2a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.counters.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop counters adapter.
+ */
+public class HadoopMapReduceCounters extends Counters {
+ /** */
+ private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>();
+
+ /**
+ * Creates new instance based on given counters.
+ *
+ * @param cntrs Counters to adapt.
+ */
+ public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) {
+ for (HadoopCounter cntr : cntrs.all())
+ if (cntr instanceof HadoopLongCounter)
+ this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized CounterGroup addGroup(CounterGroup grp) {
+ return addGroup(grp.getName(), grp.getDisplayName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CounterGroup addGroup(String name, String displayName) {
+ return new HadoopMapReduceCounterGroup(this, name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Counter findCounter(String grpName, String cntrName) {
+ return findCounter(grpName, cntrName, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Counter findCounter(Enum<?> key) {
+ return findCounter(key.getDeclaringClass().getName(), key.name(), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) {
+ return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name());
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized Iterable<String> getGroupNames() {
+ Collection<String> res = new HashSet<>();
+
+ for (HadoopCounter counter : cntrs.values())
+ res.add(counter.group());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<CounterGroup> iterator() {
+ final Iterator<String> iter = getGroupNames().iterator();
+
+ return new Iterator<CounterGroup>() {
+ @Override public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override public CounterGroup next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next());
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException("not implemented");
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized CounterGroup getGroup(String grpName) {
+ return new HadoopMapReduceCounterGroup(this, grpName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized int countCounters() {
+ return cntrs.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) {
+ for (CounterGroup group : other) {
+ for (Counter counter : group) {
+ findCounter(group.getName(), counter.getName()).increment(counter.getValue());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object genericRight) {
+ if (!(genericRight instanceof HadoopMapReduceCounters))
+ return false;
+
+ return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return cntrs.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setWriteAllCounters(boolean snd) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean getWriteAllCounters() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Limits limits() {
+ return null;
+ }
+
+ /**
+ * Returns size of a group.
+ *
+ * @param grpName Name of the group.
+ * @return amount of counters in the given group.
+ */
+ public int groupSize(String grpName) {
+ int res = 0;
+
+ for (HadoopCounter counter : cntrs.values()) {
+ if (grpName.equals(counter.group()))
+ res++;
+ }
+
+ return res;
+ }
+
+ /**
+ * Returns counters iterator for specified group.
+ *
+ * @param grpName Name of the group to iterate.
+ * @return Counters iterator.
+ */
+ public Iterator<Counter> iterateGroup(String grpName) {
+ Collection<Counter> grpCounters = new ArrayList<>();
+
+ for (HadoopLongCounter counter : cntrs.values()) {
+ if (grpName.equals(counter.group()))
+ grpCounters.add(new HadoopV2Counter(counter));
+ }
+
+ return grpCounters.iterator();
+ }
+
+ /**
+ * Find a counter in the group.
+ *
+ * @param grpName The name of the counter group.
+ * @param cntrName The name of the counter.
+ * @param create Create the counter if not found if true.
+ * @return The counter that was found or added or {@code null} if create is false.
+ */
+ public Counter findCounter(String grpName, String cntrName, boolean create) {
+ T2<String, String> key = new T2<>(grpName, cntrName);
+
+ HadoopLongCounter internalCntr = cntrs.get(key);
+
+ if (internalCntr == null & create) {
+ internalCntr = new HadoopLongCounter(grpName,cntrName);
+
+ cntrs.put(key, new HadoopLongCounter(grpName,cntrName));
+ }
+
+ return internalCntr == null ? null : new HadoopV2Counter(internalCntr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
index 75e55fd..f17ce66 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.processors.hadoop;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
import org.apache.ignite.internal.processors.hadoop.planner.*;
import org.apache.ignite.internal.processors.hadoop.shuffle.*;
@@ -35,7 +37,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*;
/**
* Hadoop processor.
*/
-public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
+public class HadoopProcessor extends HadoopProcessorAdapter {
/** Job ID counter. */
private final AtomicInteger idCtr = new AtomicInteger();
@@ -59,12 +61,12 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
if (ctx.isDaemon())
return;
- GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
+ HadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
if (cfg == null)
- cfg = new GridHadoopConfiguration();
+ cfg = new HadoopConfiguration();
else
- cfg = new GridHadoopConfiguration(cfg);
+ cfg = new HadoopConfiguration(cfg);
initializeDefaults(cfg);
@@ -167,37 +169,37 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
}
/** {@inheritDoc} */
- @Override public GridHadoopConfiguration config() {
+ @Override public HadoopConfiguration config() {
return hctx.configuration();
}
/** {@inheritDoc} */
- @Override public GridHadoopJobId nextJobId() {
- return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
+ @Override public HadoopJobId nextJobId() {
+ return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
+ @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
return hctx.jobTracker().submit(jobId, jobInfo);
}
/** {@inheritDoc} */
- @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
+ @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
return hctx.jobTracker().status(jobId);
}
/** {@inheritDoc} */
- @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException {
+ @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
return hctx.jobTracker().jobCounters(jobId);
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
+ @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
return hctx.jobTracker().finishFuture(jobId);
}
/** {@inheritDoc} */
- @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
+ @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
return hctx.jobTracker().killJob(jobId);
}
@@ -206,7 +208,7 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
*
* @param cfg Hadoop configuration.
*/
- private void initializeDefaults(GridHadoopConfiguration cfg) {
+ private void initializeDefaults(HadoopConfiguration cfg) {
if (cfg.getMapReducePlanner() == null)
cfg.setMapReducePlanner(new HadoopDefaultMapReducePlanner());
}
@@ -217,7 +219,7 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter {
* @param hadoopCfg Hadoop configuration.
* @throws IgniteCheckedException If failed.
*/
- private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException {
+ private void validate(HadoopConfiguration hadoopCfg) throws IgniteCheckedException {
if (ctx.config().isPeerClassLoadingEnabled())
throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " +
"GridConfiguration.setPeerClassLoadingEnabled()).");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 62b5a98..00be422 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -103,7 +103,7 @@ public class HadoopUtils {
* @param status Ignite job status.
* @return Hadoop job status.
*/
- public static JobStatus status(GridHadoopJobStatus status, Configuration conf) {
+ public static JobStatus status(HadoopJobStatus status, Configuration conf) {
JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
float setupProgress = 0;
@@ -281,7 +281,7 @@ public class HadoopUtils {
* @return Working directory for job.
* @throws IgniteCheckedException If Failed.
*/
- public static File jobLocalDir(UUID locNodeId, GridHadoopJobId jobId) throws IgniteCheckedException {
+ public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException {
return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId);
}
@@ -293,7 +293,7 @@ public class HadoopUtils {
* @return Working directory for task.
* @throws IgniteCheckedException If Failed.
*/
- public static File taskLocalDir(UUID locNodeId, GridHadoopTaskInfo info) throws IgniteCheckedException {
+ public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException {
File jobLocDir = jobLocalDir(locNodeId, info.jobId());
return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
index 4b96f7d..c2ed5bb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.hadoop.counter;
-import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
index bfd59ef..78e1c26 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.hadoop.counter;
import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jdk8.backport.*;
@@ -31,7 +30,7 @@ import java.util.concurrent.*;
/**
* Default in-memory counters store.
*/
-public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
+public class HadoopCountersImpl implements HadoopCounters, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -59,7 +58,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
*
* @param cntrs Counters to copy.
*/
- public HadoopCountersImpl(GridHadoopCounters cntrs) {
+ public HadoopCountersImpl(HadoopCounters cntrs) {
this(cntrs.all());
}
@@ -131,7 +130,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable {
}
/** {@inheritDoc} */
- @Override public void merge(GridHadoopCounters other) {
+ @Override public void merge(HadoopCounters other) {
for (HadoopCounter counter : other.all())
counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
index d926706..ce86edb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.hadoop.counter;
-import org.apache.ignite.internal.processors.hadoop.*;
-
import java.io.*;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
index 6f57ae4..351839a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java
@@ -117,7 +117,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
* @param evtType The type of the event.
* @return String contains necessary event information.
*/
- private String eventName(GridHadoopTaskInfo info, String evtType) {
+ private String eventName(HadoopTaskInfo info, String evtType) {
return eventName(info.type().toString(), info.taskNumber(), evtType);
}
@@ -141,7 +141,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
* @param info Task info.
* @param ts Timestamp of the event.
*/
- public void onTaskSubmit(GridHadoopTaskInfo info, long ts) {
+ public void onTaskSubmit(HadoopTaskInfo info, long ts) {
evts.add(new T2<>(eventName(info, "submit"), ts));
}
@@ -151,7 +151,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
* @param info Task info.
* @param ts Timestamp of the event.
*/
- public void onTaskPrepare(GridHadoopTaskInfo info, long ts) {
+ public void onTaskPrepare(HadoopTaskInfo info, long ts) {
evts.add(new T2<>(eventName(info, "prepare"), ts));
}
@@ -161,7 +161,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
* @param info Task info.
* @param ts Timestamp of the event.
*/
- public void onTaskFinish(GridHadoopTaskInfo info, long ts) {
+ public void onTaskFinish(HadoopTaskInfo info, long ts) {
if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) {
evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
@@ -178,7 +178,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
* @param info Task info.
* @param ts Timestamp of the event.
*/
- public void onTaskStart(GridHadoopTaskInfo info, long ts) {
+ public void onTaskStart(HadoopTaskInfo info, long ts) {
evts.add(new T2<>(eventName(info, "start"), ts));
}
@@ -209,7 +209,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
*
* @param info Job info.
*/
- public void clientSubmissionEvents(GridHadoopJobInfo info) {
+ public void clientSubmissionEvents(HadoopJobInfo info) {
assert nodeId != null;
addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
@@ -224,7 +224,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
* @param info Job info.
* @param propName Property name to get timestamp.
*/
- private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) {
+ private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) {
String val = info.property(propName);
if (!F.isEmpty(val)) {
@@ -253,13 +253,13 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter {
}
/**
- * Gets system predefined performance counter from the GridHadoopCounters object.
+ * Gets system predefined performance counter from the HadoopCounters object.
*
- * @param cntrs GridHadoopCounters object.
+ * @param cntrs HadoopCounters object.
* @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
* @return Predefined performance counter.
*/
- public static HadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) {
+ public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) {
HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class);
if (nodeId != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
index 35fd27c..0c29454 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.hadoop.igfs;
import org.apache.commons.logging.*;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
/**
@@ -26,6 +28,7 @@ import org.jetbrains.annotations.*;
*/
public class HadoopIgfsJclLogger implements IgniteLogger {
/** JCL implementation proxy. */
+ @GridToStringInclude
private Log impl;
/**
@@ -107,6 +110,6 @@ public class HadoopIgfsJclLogger implements IgniteLogger {
/** {@inheritDoc} */
@Override public String toString() {
- return "IgfsHadoopJclLogger [impl=" + impl + ']';
+ return S.toString(HadoopIgfsJclLogger.class, this);
}
}