You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:25 UTC
[09/31] incubator-ignite git commit: # IGNITE-386: WIP on internal
namings (2).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopPrepareForJobRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopPrepareForJobRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopPrepareForJobRequest.java
deleted file mode 100644
index 3a55d19..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopPrepareForJobRequest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Child process initialization request.
- */
-public class GridHadoopPrepareForJobRequest implements GridHadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- @GridToStringInclude
- private GridHadoopJobId jobId;
-
- /** Job info. */
- @GridToStringInclude
- private GridHadoopJobInfo jobInfo;
-
- /** Total amount of reducers in the job. */
- @GridToStringInclude
- private int totalReducersCnt;
-
- /** Reducers to be executed on current node. */
- @GridToStringInclude
- private int[] locReducers;
-
- /**
- * Constructor required by {@link Externalizable}.
- */
- public GridHadoopPrepareForJobRequest() {
- // No-op.
- }
-
- /**
- * @param jobId Job ID.
- * @param jobInfo Job info.
- * @param totalReducersCnt Number of reducers in the job.
- * @param locReducers Reducers to be executed on current node.
- */
- public GridHadoopPrepareForJobRequest(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo, int totalReducersCnt,
- int[] locReducers) {
- assert jobId != null;
-
- this.jobId = jobId;
- this.jobInfo = jobInfo;
- this.totalReducersCnt = totalReducersCnt;
- this.locReducers = locReducers;
- }
-
- /**
- * @return Job info.
- */
- public GridHadoopJobInfo jobInfo() {
- return jobInfo;
- }
-
- /**
- * @return Job ID.
- */
- public GridHadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @return Reducers to be executed on current node.
- */
- public int[] localReducers() {
- return locReducers;
- }
-
- /**
- * @return Number of reducers in job.
- */
- public int totalReducerCount() {
- return totalReducersCnt;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
-
- out.writeObject(jobInfo);
- out.writeInt(totalReducersCnt);
-
- U.writeIntArray(out, locReducers);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new GridHadoopJobId();
- jobId.readExternal(in);
-
- jobInfo = (GridHadoopJobInfo)in.readObject();
- totalReducersCnt = in.readInt();
-
- locReducers = U.readIntArray(in);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopPrepareForJobRequest.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessDescriptor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessDescriptor.java
deleted file mode 100644
index 7fc8858..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessDescriptor.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Process descriptor used to identify process for which task is running.
- */
-public class GridHadoopProcessDescriptor implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Parent node ID. */
- private UUID parentNodeId;
-
- /** Process ID. */
- private UUID procId;
-
- /** Address. */
- private String addr;
-
- /** TCP port. */
- private int tcpPort;
-
- /** Shared memory port. */
- private int shmemPort;
-
- /**
- * @param parentNodeId Parent node ID.
- * @param procId Process ID.
- */
- public GridHadoopProcessDescriptor(UUID parentNodeId, UUID procId) {
- this.parentNodeId = parentNodeId;
- this.procId = procId;
- }
-
- /**
- * Gets process ID.
- *
- * @return Process ID.
- */
- public UUID processId() {
- return procId;
- }
-
- /**
- * Gets parent node ID.
- *
- * @return Parent node ID.
- */
- public UUID parentNodeId() {
- return parentNodeId;
- }
-
- /**
- * Gets host address.
- *
- * @return Host address.
- */
- public String address() {
- return addr;
- }
-
- /**
- * Sets host address.
- *
- * @param addr Host address.
- */
- public void address(String addr) {
- this.addr = addr;
- }
-
- /**
- * @return Shared memory port.
- */
- public int sharedMemoryPort() {
- return shmemPort;
- }
-
- /**
- * Sets shared memory port.
- *
- * @param shmemPort Shared memory port.
- */
- public void sharedMemoryPort(int shmemPort) {
- this.shmemPort = shmemPort;
- }
-
- /**
- * @return TCP port.
- */
- public int tcpPort() {
- return tcpPort;
- }
-
- /**
- * Sets TCP port.
- *
- * @param tcpPort TCP port.
- */
- public void tcpPort(int tcpPort) {
- this.tcpPort = tcpPort;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (!(o instanceof GridHadoopProcessDescriptor))
- return false;
-
- GridHadoopProcessDescriptor that = (GridHadoopProcessDescriptor)o;
-
- return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int result = parentNodeId.hashCode();
-
- result = 31 * result + procId.hashCode();
-
- return result;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopProcessDescriptor.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessStartedAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessStartedAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessStartedAck.java
deleted file mode 100644
index 679da6c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessStartedAck.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Process started message.
- */
-public class GridHadoopProcessStartedAck implements GridHadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopProcessStartedAck.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskExecutionRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskExecutionRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskExecutionRequest.java
deleted file mode 100644
index 9f11e0e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskExecutionRequest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Message sent from node to child process to start task(s) execution.
- */
-public class GridHadoopTaskExecutionRequest implements GridHadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- @GridToStringInclude
- private GridHadoopJobId jobId;
-
- /** Job info. */
- @GridToStringInclude
- private GridHadoopJobInfo jobInfo;
-
- /** Mappers. */
- @GridToStringInclude
- private Collection<GridHadoopTaskInfo> tasks;
-
- /**
- * @return Job ID.
- */
- public GridHadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @param jobId Job ID.
- */
- public void jobId(GridHadoopJobId jobId) {
- this.jobId = jobId;
- }
-
- /**
- * @return Jon info.
- */
- public GridHadoopJobInfo jobInfo() {
- return jobInfo;
- }
-
- /**
- * @param jobInfo Job info.
- */
- public void jobInfo(GridHadoopJobInfo jobInfo) {
- this.jobInfo = jobInfo;
- }
-
- /**
- * @return Tasks.
- */
- public Collection<GridHadoopTaskInfo> tasks() {
- return tasks;
- }
-
- /**
- * @param tasks Tasks.
- */
- public void tasks(Collection<GridHadoopTaskInfo> tasks) {
- this.tasks = tasks;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopTaskExecutionRequest.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
-
- out.writeObject(jobInfo);
- U.writeCollection(out, tasks);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new GridHadoopJobId();
- jobId.readExternal(in);
-
- jobInfo = (GridHadoopJobInfo)in.readObject();
- tasks = U.readCollection(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskFinishedMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskFinishedMessage.java
deleted file mode 100644
index f69abaf..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskFinishedMessage.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Task finished message. Sent when local task finishes execution.
- */
-public class GridHadoopTaskFinishedMessage implements GridHadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Finished task info. */
- private GridHadoopTaskInfo taskInfo;
-
- /** Task finish status. */
- private GridHadoopTaskStatus status;
-
- /**
- * Constructor required by {@link Externalizable}.
- */
- public GridHadoopTaskFinishedMessage() {
- // No-op.
- }
-
- /**
- * @param taskInfo Finished task info.
- * @param status Task finish status.
- */
- public GridHadoopTaskFinishedMessage(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status) {
- assert taskInfo != null;
- assert status != null;
-
- this.taskInfo = taskInfo;
- this.status = status;
- }
-
- /**
- * @return Finished task info.
- */
- public GridHadoopTaskInfo taskInfo() {
- return taskInfo;
- }
-
- /**
- * @return Task finish status.
- */
- public GridHadoopTaskStatus status() {
- return status;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridHadoopTaskFinishedMessage.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- taskInfo.writeExternal(out);
- status.writeExternal(out);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- taskInfo = new GridHadoopTaskInfo();
- taskInfo.readExternal(in);
-
- status = new GridHadoopTaskStatus();
- status.readExternal(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
index 616d383..10ad648 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -39,7 +39,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
+import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
/**
* External process registry. Handles external process lifecycle.
@@ -55,7 +55,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
private IgniteLogger log;
/** Node process descriptor. */
- private GridHadoopProcessDescriptor nodeDesc;
+ private HadoopProcessDescriptor nodeDesc;
/** Output base. */
private File outputBase;
@@ -127,7 +127,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
}
/** {@inheritDoc} */
- @Override public void onJobStateChanged(final GridHadoopJobMetadata meta) {
+ @Override public void onJobStateChanged(final HadoopJobMetadata meta) {
final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
// If we have a local process for this job.
@@ -156,8 +156,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
"[jobId=" + meta.jobId() + ", meta=" + meta + ']');
}
else {
- proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
- @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+ proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
try {
f.get();
@@ -223,9 +223,9 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
final HadoopProcess proc0 = proc;
- proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+ proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
@Override public void apply(
- IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+ IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
if (!busyLock.tryReadLock())
return;
@@ -281,7 +281,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
return;
}
- GridHadoopTaskExecutionRequest req = new GridHadoopTaskExecutionRequest();
+ HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest();
req.jobId(job.id());
req.jobInfo(job.info());
@@ -297,8 +297,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
/**
* @return External task metadata.
*/
- private GridHadoopExternalTaskMetadata buildTaskMeta() {
- GridHadoopExternalTaskMetadata meta = new GridHadoopExternalTaskMetadata();
+ private HadoopExternalTaskMetadata buildTaskMeta() {
+ HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata();
meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
@@ -312,8 +312,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @param state Fail state.
* @param e Optional error.
*/
- private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, GridHadoopTaskState state, Throwable e) {
- GridHadoopTaskStatus fail = new GridHadoopTaskStatus(state, e);
+ private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) {
+ HadoopTaskStatus fail = new HadoopTaskStatus(state, e);
for (GridHadoopTaskInfo task : tasks)
jobTracker.onTaskFinished(task, fail);
@@ -351,7 +351,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
}
try {
- GridHadoopExternalTaskMetadata startMeta = buildTaskMeta();
+ HadoopExternalTaskMetadata startMeta = buildTaskMeta();
if (log.isDebugEnabled())
log.debug("Created hadoop child process metadata for job [job=" + job +
@@ -404,8 +404,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
}
}, true);
- fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
- @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+ fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
try {
// Make sure there were no exceptions.
f.get();
@@ -493,7 +493,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @param job Job.
* @return Started process.
*/
- private Process startJavaProcess(UUID childProcId, GridHadoopExternalTaskMetadata startMeta,
+ private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta,
GridHadoopJob job) throws Exception {
String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
@@ -565,18 +565,18 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @param proc Process to send request to.
* @param meta Job metadata.
*/
- private void sendJobInfoUpdate(HadoopProcess proc, GridHadoopJobMetadata meta) {
- Map<Integer, GridHadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
+ private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) {
+ Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
int rdcNum = meta.mapReducePlan().reducers();
- GridHadoopProcessDescriptor[] addrs = null;
+ HadoopProcessDescriptor[] addrs = null;
if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
- addrs = new GridHadoopProcessDescriptor[rdcNum];
+ addrs = new HadoopProcessDescriptor[rdcNum];
for (int i = 0; i < rdcNum; i++) {
- GridHadoopProcessDescriptor desc = rdcAddrs.get(i);
+ HadoopProcessDescriptor desc = rdcAddrs.get(i);
assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
@@ -585,7 +585,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
}
try {
- comm.sendMessage(proc.descriptor(), new GridHadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
+ comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
}
catch (IgniteCheckedException e) {
if (!proc.terminated()) {
@@ -606,7 +606,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
*/
private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) {
try {
- comm.sendMessage(proc.descriptor(), new GridHadoopPrepareForJobRequest(job.id(), job.info(),
+ comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(),
plan.reducers(), plan.reducers(ctx.localNodeId())));
}
catch (IgniteCheckedException e) {
@@ -623,7 +623,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @param desc Remote process descriptor.
* @param taskMsg Task finished message.
*/
- private void processTaskFinishedMessage(GridHadoopProcessDescriptor desc, GridHadoopTaskFinishedMessage taskMsg) {
+ private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) {
HadoopProcess proc = runningProcsByProcId.get(desc.processId());
if (proc != null)
@@ -637,12 +637,12 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
*/
private class MessageListener implements GridHadoopMessageListener {
/** {@inheritDoc} */
- @Override public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
+ @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) {
if (!busyLock.tryReadLock())
return;
try {
- if (msg instanceof GridHadoopProcessStartedAck) {
+ if (msg instanceof HadoopProcessStartedAck) {
HadoopProcess proc = runningProcsByProcId.get(desc.processId());
assert proc != null : "Missing child process for processId: " + desc;
@@ -655,8 +655,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
else
log.warning("Failed to find process start future (will ignore): " + desc);
}
- else if (msg instanceof GridHadoopTaskFinishedMessage) {
- GridHadoopTaskFinishedMessage taskMsg = (GridHadoopTaskFinishedMessage)msg;
+ else if (msg instanceof HadoopTaskFinishedMessage) {
+ HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg;
processTaskFinishedMessage(desc, taskMsg);
}
@@ -669,7 +669,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
}
/** {@inheritDoc} */
- @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) {
+ @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
if (!busyLock.tryReadLock())
return;
@@ -689,7 +689,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
if (!F.isEmpty(tasks)) {
log.warning("Lost connection with alive process (will terminate): " + desc);
- GridHadoopTaskStatus status = new GridHadoopTaskStatus(CRASHED,
+ HadoopTaskStatus status = new HadoopTaskStatus(CRASHED,
new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
for (GridHadoopTaskInfo info : tasks)
@@ -725,7 +725,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
private final GridHadoopProcessFuture initFut;
/** Process descriptor. */
- private GridHadoopProcessDescriptor procDesc;
+ private HadoopProcessDescriptor procDesc;
/** Reducers planned for this process. */
private Collection<Integer> reducers;
@@ -756,7 +756,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
/**
* @return Communication process descriptor.
*/
- private GridHadoopProcessDescriptor descriptor() {
+ private HadoopProcessDescriptor descriptor() {
return procDesc;
}
@@ -773,7 +773,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
* @param proc Java process representation.
* @param procDesc Process descriptor.
*/
- private void onInitialized(Process proc, GridHadoopProcessDescriptor procDesc) {
+ private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) {
this.proc = proc;
this.procDesc = procDesc;
}
@@ -789,9 +789,9 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
terminated = true;
if (!initFut.isDone())
- initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+ initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
@Override public void apply(
- IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+ IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
proc.destroy();
}
});
@@ -852,7 +852,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
/**
*
*/
- private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> {
+ private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> {
/** */
private static final long serialVersionUID = 0L;
@@ -863,7 +863,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
private GridHadoopJobId jobId;
/** Process descriptor. */
- private GridHadoopProcessDescriptor desc;
+ private HadoopProcessDescriptor desc;
/** Running process. */
private Process proc;
@@ -909,7 +909,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
/**
* Reply received callback.
*/
- public void onReplyReceived(GridHadoopProcessDescriptor desc) {
+ public void onReplyReceived(HadoopProcessDescriptor desc) {
assert childProcId.equals(desc.processId());
this.desc = desc;
@@ -921,7 +921,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
}
/** {@inheritDoc} */
- @Override public boolean onDone(@Nullable IgniteBiTuple<Process, GridHadoopProcessDescriptor> res,
+ @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res,
@Nullable Throwable err) {
if (err == null) {
HadoopProcess proc = runningProcsByProcId.get(childProcId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
new file mode 100644
index 0000000..f0acc9f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * External task metadata (classpath, JVM options) needed to start external process execution.
+ */
+public class HadoopExternalTaskMetadata {
+ /** Process classpath. */
+ private Collection<String> classpath;
+
+ /** JVM options. */
+ @GridToStringInclude
+ private Collection<String> jvmOpts;
+
+ /**
+ * @return JVM Options.
+ */
+ public Collection<String> jvmOptions() {
+ return jvmOpts;
+ }
+
+ /**
+ * @param jvmOpts JVM options.
+ */
+ public void jvmOptions(Collection<String> jvmOpts) {
+ this.jvmOpts = jvmOpts;
+ }
+
+ /**
+ * @return Classpath.
+ */
+ public Collection<String> classpath() {
+ return classpath;
+ }
+
+ /**
+ * @param classpath Classpath.
+ */
+ public void classpath(Collection<String> classpath) {
+ this.classpath = classpath;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopExternalTaskMetadata.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
new file mode 100644
index 0000000..1258819
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Job info update request.
+ */
+public class HadoopJobInfoUpdateRequest implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ @GridToStringInclude
+ private GridHadoopJobId jobId;
+
+ /** Job phase. */
+ @GridToStringInclude
+ private GridHadoopJobPhase jobPhase;
+
+ /** Reducers addresses. */
+ @GridToStringInclude
+ private HadoopProcessDescriptor[] reducersAddrs;
+
+ /**
+ * Constructor required by {@link Externalizable}.
+ */
+ public HadoopJobInfoUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param jobId Job ID.
+ * @param jobPhase Job phase.
+ * @param reducersAddrs Reducers addresses.
+ */
+ public HadoopJobInfoUpdateRequest(GridHadoopJobId jobId, GridHadoopJobPhase jobPhase,
+ HadoopProcessDescriptor[] reducersAddrs) {
+ assert jobId != null;
+
+ this.jobId = jobId;
+ this.jobPhase = jobPhase;
+ this.reducersAddrs = reducersAddrs;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public GridHadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Job phase.
+ */
+ public GridHadoopJobPhase jobPhase() {
+ return jobPhase;
+ }
+
+ /**
+ * @return Reducers addresses.
+ */
+ public HadoopProcessDescriptor[] reducersAddresses() {
+ return reducersAddrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+
+ out.writeObject(jobPhase);
+ U.writeArray(out, reducersAddrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new GridHadoopJobId();
+ jobId.readExternal(in);
+
+ jobPhase = (GridHadoopJobPhase)in.readObject();
+ reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopJobInfoUpdateRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
new file mode 100644
index 0000000..4037b26
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Child process initialization request.
+ */
+public class HadoopPrepareForJobRequest implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ @GridToStringInclude
+ private GridHadoopJobId jobId;
+
+ /** Job info. */
+ @GridToStringInclude
+ private GridHadoopJobInfo jobInfo;
+
+ /** Total amount of reducers in the job. */
+ @GridToStringInclude
+ private int totalReducersCnt;
+
+ /** Reducers to be executed on current node. */
+ @GridToStringInclude
+ private int[] locReducers;
+
+ /**
+ * Constructor required by {@link Externalizable}.
+ */
+ public HadoopPrepareForJobRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param jobId Job ID.
+ * @param jobInfo Job info.
+ * @param totalReducersCnt Number of reducers in the job.
+ * @param locReducers Reducers to be executed on current node.
+ */
+ public HadoopPrepareForJobRequest(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo, int totalReducersCnt,
+ int[] locReducers) {
+ assert jobId != null;
+
+ this.jobId = jobId;
+ this.jobInfo = jobInfo;
+ this.totalReducersCnt = totalReducersCnt;
+ this.locReducers = locReducers;
+ }
+
+ /**
+ * @return Job info.
+ */
+ public GridHadoopJobInfo jobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public GridHadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Reducers to be executed on current node.
+ */
+ public int[] localReducers() {
+ return locReducers;
+ }
+
+ /**
+ * @return Number of reducers in job.
+ */
+ public int totalReducerCount() {
+ return totalReducersCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+
+ out.writeObject(jobInfo);
+ out.writeInt(totalReducersCnt);
+
+ U.writeIntArray(out, locReducers);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new GridHadoopJobId();
+ jobId.readExternal(in);
+
+ jobInfo = (GridHadoopJobInfo)in.readObject();
+ totalReducersCnt = in.readInt();
+
+ locReducers = U.readIntArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopPrepareForJobRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
new file mode 100644
index 0000000..dea73c3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Process descriptor used to identify process for which task is running.
+ */
+public class HadoopProcessDescriptor implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Parent node ID. */
+ private UUID parentNodeId;
+
+ /** Process ID. */
+ private UUID procId;
+
+ /** Address. */
+ private String addr;
+
+ /** TCP port. */
+ private int tcpPort;
+
+ /** Shared memory port. */
+ private int shmemPort;
+
+ /**
+ * @param parentNodeId Parent node ID.
+ * @param procId Process ID.
+ */
+ public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) {
+ this.parentNodeId = parentNodeId;
+ this.procId = procId;
+ }
+
+ /**
+ * Gets process ID.
+ *
+ * @return Process ID.
+ */
+ public UUID processId() {
+ return procId;
+ }
+
+ /**
+ * Gets parent node ID.
+ *
+ * @return Parent node ID.
+ */
+ public UUID parentNodeId() {
+ return parentNodeId;
+ }
+
+ /**
+ * Gets host address.
+ *
+ * @return Host address.
+ */
+ public String address() {
+ return addr;
+ }
+
+ /**
+ * Sets host address.
+ *
+ * @param addr Host address.
+ */
+ public void address(String addr) {
+ this.addr = addr;
+ }
+
+ /**
+ * @return Shared memory port.
+ */
+ public int sharedMemoryPort() {
+ return shmemPort;
+ }
+
+ /**
+ * Sets shared memory port.
+ *
+ * @param shmemPort Shared memory port.
+ */
+ public void sharedMemoryPort(int shmemPort) {
+ this.shmemPort = shmemPort;
+ }
+
+ /**
+ * @return TCP port.
+ */
+ public int tcpPort() {
+ return tcpPort;
+ }
+
+ /**
+ * Sets TCP port.
+ *
+ * @param tcpPort TCP port.
+ */
+ public void tcpPort(int tcpPort) {
+ this.tcpPort = tcpPort;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof HadoopProcessDescriptor))
+ return false;
+
+ HadoopProcessDescriptor that = (HadoopProcessDescriptor)o;
+
+ return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = parentNodeId.hashCode();
+
+ result = 31 * result + procId.hashCode();
+
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcessDescriptor.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
new file mode 100644
index 0000000..49ff4bf
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Process started message.
+ */
+public class HadoopProcessStartedAck implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopProcessStartedAck.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
new file mode 100644
index 0000000..edf1840
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Message sent from node to child process to start task(s) execution.
+ */
+public class HadoopTaskExecutionRequest implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ @GridToStringInclude
+ private GridHadoopJobId jobId;
+
+ /** Job info. */
+ @GridToStringInclude
+ private GridHadoopJobInfo jobInfo;
+
+ /** Mappers. */
+ @GridToStringInclude
+ private Collection<GridHadoopTaskInfo> tasks;
+
+ /**
+ * @return Job ID.
+ */
+ public GridHadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @param jobId Job ID.
+ */
+ public void jobId(GridHadoopJobId jobId) {
+ this.jobId = jobId;
+ }
+
+ /**
+ * @return Jon info.
+ */
+ public GridHadoopJobInfo jobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * @param jobInfo Job info.
+ */
+ public void jobInfo(GridHadoopJobInfo jobInfo) {
+ this.jobInfo = jobInfo;
+ }
+
+ /**
+ * @return Tasks.
+ */
+ public Collection<GridHadoopTaskInfo> tasks() {
+ return tasks;
+ }
+
+ /**
+ * @param tasks Tasks.
+ */
+ public void tasks(Collection<GridHadoopTaskInfo> tasks) {
+ this.tasks = tasks;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopTaskExecutionRequest.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+
+ out.writeObject(jobInfo);
+ U.writeCollection(out, tasks);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new GridHadoopJobId();
+ jobId.readExternal(in);
+
+ jobInfo = (GridHadoopJobInfo)in.readObject();
+ tasks = U.readCollection(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
new file mode 100644
index 0000000..a516f6b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Task finished message. Sent when local task finishes execution.
+ */
+public class HadoopTaskFinishedMessage implements HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Finished task info. */
+ private GridHadoopTaskInfo taskInfo;
+
+ /** Task finish status. */
+ private HadoopTaskStatus status;
+
+ /**
+ * Constructor required by {@link Externalizable}.
+ */
+ public HadoopTaskFinishedMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param taskInfo Finished task info.
+ * @param status Task finish status.
+ */
+ public HadoopTaskFinishedMessage(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status) {
+ assert taskInfo != null;
+ assert status != null;
+
+ this.taskInfo = taskInfo;
+ this.status = status;
+ }
+
+ /**
+ * @return Finished task info.
+ */
+ public GridHadoopTaskInfo taskInfo() {
+ return taskInfo;
+ }
+
+ /**
+ * @return Task finish status.
+ */
+ public HadoopTaskStatus status() {
+ return status;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopTaskFinishedMessage.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ taskInfo.writeExternal(out);
+ status.writeExternal(out);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ taskInfo = new GridHadoopTaskInfo();
+ taskInfo.readExternal(in);
+
+ status = new HadoopTaskStatus();
+ status.readExternal(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
index 2d00222..21552e2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java
@@ -42,13 +42,13 @@ import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
public class GridHadoopChildProcessRunner {
/** Node process descriptor. */
- private GridHadoopProcessDescriptor nodeDesc;
+ private HadoopProcessDescriptor nodeDesc;
/** Message processing executor service. */
private ExecutorService msgExecSvc;
/** Task executor service. */
- private GridHadoopExecutorService execSvc;
+ private HadoopExecutorService execSvc;
/** */
protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
@@ -75,7 +75,7 @@ public class GridHadoopChildProcessRunner {
private final AtomicInteger pendingTasks = new AtomicInteger();
/** Shuffle job. */
- private GridHadoopShuffleJob<GridHadoopProcessDescriptor> shuffleJob;
+ private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob;
/** Concurrent mappers. */
private int concMappers;
@@ -86,7 +86,7 @@ public class GridHadoopChildProcessRunner {
/**
* Starts child process runner.
*/
- public void start(GridHadoopExternalCommunication comm, GridHadoopProcessDescriptor nodeDesc,
+ public void start(GridHadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc,
ExecutorService msgExecSvc, IgniteLogger parentLog)
throws IgniteCheckedException {
this.comm = comm;
@@ -99,7 +99,7 @@ public class GridHadoopChildProcessRunner {
startTime = U.currentTimeMillis();
// At this point node knows that this process has started.
- comm.sendMessage(this.nodeDesc, new GridHadoopProcessStartedAck());
+ comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck());
}
/**
@@ -107,7 +107,7 @@ public class GridHadoopChildProcessRunner {
*
* @param req Initialization request.
*/
- private void prepareProcess(GridHadoopPrepareForJobRequest req) {
+ private void prepareProcess(HadoopPrepareForJobRequest req) {
if (initGuard.compareAndSet(false, true)) {
try {
if (log.isDebugEnabled())
@@ -119,7 +119,7 @@ public class GridHadoopChildProcessRunner {
job.initialize(true, nodeDesc.processId());
- shuffleJob = new GridHadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
+ shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
req.totalReducerCount(), req.localReducers());
initializeExecutors(req);
@@ -143,7 +143,7 @@ public class GridHadoopChildProcessRunner {
/**
* @param req Task execution request.
*/
- private void runTasks(final GridHadoopTaskExecutionRequest req) {
+ private void runTasks(final HadoopTaskExecutionRequest req) {
if (!initFut.isDone() && log.isDebugEnabled())
log.debug("Will wait for process initialization future completion: " + req);
@@ -175,7 +175,7 @@ public class GridHadoopChildProcessRunner {
log.debug("Submitted task for external execution: " + taskInfo);
execSvc.submit(new GridHadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) {
- @Override protected void onTaskFinished(GridHadoopTaskStatus status) {
+ @Override protected void onTaskFinished(HadoopTaskStatus status) {
onTaskFinished0(this, status);
}
@@ -193,7 +193,7 @@ public class GridHadoopChildProcessRunner {
}
catch (IgniteCheckedException e) {
for (GridHadoopTaskInfo info : req.tasks())
- notifyTaskFinished(info, new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false);
+ notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
}
}
});
@@ -204,13 +204,13 @@ public class GridHadoopChildProcessRunner {
*
* @param req Init child process request.
*/
- private void initializeExecutors(GridHadoopPrepareForJobRequest req) {
+ private void initializeExecutors(HadoopPrepareForJobRequest req) {
int cpus = Runtime.getRuntime().availableProcessors();
//
// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus);
- execSvc = new GridHadoopExecutorService(log, "", cpus * 2, 1024);
+ execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024);
}
/**
@@ -218,7 +218,7 @@ public class GridHadoopChildProcessRunner {
*
* @param req Update request.
*/
- private void updateTasks(final GridHadoopJobInfoUpdateRequest req) {
+ private void updateTasks(final HadoopJobInfoUpdateRequest req) {
initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> gridFut) {
assert initGuard.get();
@@ -228,9 +228,9 @@ public class GridHadoopChildProcessRunner {
if (req.reducersAddresses() != null) {
if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
shuffleJob.startSending("external",
- new IgniteInClosure2X<GridHadoopProcessDescriptor, GridHadoopShuffleMessage>() {
- @Override public void applyx(GridHadoopProcessDescriptor dest,
- GridHadoopShuffleMessage msg) throws IgniteCheckedException {
+ new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
+ @Override public void applyx(HadoopProcessDescriptor dest,
+ HadoopShuffleMessage msg) throws IgniteCheckedException {
comm.sendMessage(dest, msg);
}
});
@@ -264,7 +264,7 @@ public class GridHadoopChildProcessRunner {
* @param run Finished task runnable.
* @param status Task status.
*/
- private void onTaskFinished0(GridHadoopRunnableTask run, GridHadoopTaskStatus status) {
+ private void onTaskFinished0(GridHadoopRunnableTask run, HadoopTaskStatus status) {
GridHadoopTaskInfo info = run.taskInfo();
int pendingTasks0 = pendingTasks.decrementAndGet();
@@ -286,10 +286,10 @@ public class GridHadoopChildProcessRunner {
* @param taskInfo Finished task info.
* @param status Task status.
*/
- private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final GridHadoopTaskStatus status,
+ private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final HadoopTaskStatus status,
boolean flush) {
- final GridHadoopTaskState state = status.state();
+ final HadoopTaskState state = status.state();
final Throwable err = status.failCause();
if (!flush) {
@@ -298,7 +298,7 @@ public class GridHadoopChildProcessRunner {
log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state +
", err=" + err + ']');
- comm.sendMessage(nodeDesc, new GridHadoopTaskFinishedMessage(taskInfo, status));
+ comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status));
}
catch (IgniteCheckedException e) {
log.error("Failed to send message to parent node (will terminate child process).", e);
@@ -335,7 +335,7 @@ public class GridHadoopChildProcessRunner {
", state=" + state + ", err=" + err + ']', e);
notifyTaskFinished(taskInfo,
- new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false);
+ new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
}
}
});
@@ -344,7 +344,7 @@ public class GridHadoopChildProcessRunner {
log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
", state=" + state + ", err=" + err + ']', e);
- notifyTaskFinished(taskInfo, new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false);
+ notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
}
}
}
@@ -356,7 +356,7 @@ public class GridHadoopChildProcessRunner {
* @param msg Received message.
* @return {@code True} if received from parent node.
*/
- private boolean validateNodeMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
+ private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) {
if (!nodeDesc.processId().equals(desc.processId())) {
log.warning("Received process control request from unknown process (will ignore) [desc=" + desc +
", msg=" + msg + ']');
@@ -379,31 +379,31 @@ public class GridHadoopChildProcessRunner {
*/
private class MessageListener implements GridHadoopMessageListener {
/** {@inheritDoc} */
- @Override public void onMessageReceived(final GridHadoopProcessDescriptor desc, final GridHadoopMessage msg) {
- if (msg instanceof GridHadoopTaskExecutionRequest) {
+ @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) {
+ if (msg instanceof HadoopTaskExecutionRequest) {
if (validateNodeMessage(desc, msg))
- runTasks((GridHadoopTaskExecutionRequest)msg);
+ runTasks((HadoopTaskExecutionRequest)msg);
}
- else if (msg instanceof GridHadoopJobInfoUpdateRequest) {
+ else if (msg instanceof HadoopJobInfoUpdateRequest) {
if (validateNodeMessage(desc, msg))
- updateTasks((GridHadoopJobInfoUpdateRequest)msg);
+ updateTasks((HadoopJobInfoUpdateRequest)msg);
}
- else if (msg instanceof GridHadoopPrepareForJobRequest) {
+ else if (msg instanceof HadoopPrepareForJobRequest) {
if (validateNodeMessage(desc, msg))
- prepareProcess((GridHadoopPrepareForJobRequest)msg);
+ prepareProcess((HadoopPrepareForJobRequest)msg);
}
- else if (msg instanceof GridHadoopShuffleMessage) {
+ else if (msg instanceof HadoopShuffleMessage) {
if (log.isTraceEnabled())
log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> f) {
try {
- GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg;
+ HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
shuffleJob.onShuffleMessage(m);
- comm.sendMessage(desc, new GridHadoopShuffleAck(m.id(), m.jobId()));
+ comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
@@ -411,18 +411,18 @@ public class GridHadoopChildProcessRunner {
}
});
}
- else if (msg instanceof GridHadoopShuffleAck) {
+ else if (msg instanceof HadoopShuffleAck) {
if (log.isTraceEnabled())
log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
- shuffleJob.onShuffleAck((GridHadoopShuffleAck)msg);
+ shuffleJob.onShuffleAck((HadoopShuffleAck)msg);
}
else
log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
}
/** {@inheritDoc} */
- @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) {
+ @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
if (log.isDebugEnabled())
log.debug("Lost connection with remote process: " + desc);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
index 5aeeeee..1216c9a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java
@@ -98,7 +98,7 @@ public class GridHadoopExternalProcessStarter {
comm.start();
- GridHadoopProcessDescriptor nodeDesc = new GridHadoopProcessDescriptor(args.nodeId, args.parentProcId);
+ HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId);
nodeDesc.address(args.addr);
nodeDesc.tcpPort(args.tcpPort);
nodeDesc.sharedMemoryPort(args.shmemPort);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java
index b375b55..f4eb41a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java
@@ -68,5 +68,5 @@ public interface GridHadoopCommunicationClient {
* @param msg Message to send.
* @throws IgniteCheckedException If failed.
*/
- public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws IgniteCheckedException;
+ public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java
index f5ddced..937e245 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java
@@ -87,10 +87,10 @@ public class GridHadoopExternalCommunication {
public static final boolean DFLT_TCP_NODELAY = true;
/** Server listener. */
- private final GridNioServerListener<GridHadoopMessage> srvLsnr =
- new GridNioServerListenerAdapter<GridHadoopMessage>() {
+ private final GridNioServerListener<HadoopMessage> srvLsnr =
+ new GridNioServerListenerAdapter<HadoopMessage>() {
@Override public void onConnected(GridNioSession ses) {
- GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META);
+ HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
assert desc != null : "Received connected notification without finished handshake: " + ses;
}
@@ -103,7 +103,7 @@ public class GridHadoopExternalCommunication {
if (e != null)
U.error(log, "Session disconnected due to exception: " + ses, e);
- GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META);
+ HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
if (desc != null) {
GridHadoopCommunicationClient rmv = clients.remove(desc.processId());
@@ -120,8 +120,8 @@ public class GridHadoopExternalCommunication {
}
/** {@inheritDoc} */
- @Override public void onMessage(GridNioSession ses, GridHadoopMessage msg) {
- notifyListener(ses.<GridHadoopProcessDescriptor>meta(PROCESS_META), msg);
+ @Override public void onMessage(GridNioSession ses, HadoopMessage msg) {
+ notifyListener(ses.<HadoopProcessDescriptor>meta(PROCESS_META), msg);
if (msgQueueLimit > 0) {
GridNioMessageTracker tracker = ses.meta(TRACKER_META);
@@ -137,7 +137,7 @@ public class GridHadoopExternalCommunication {
private IgniteLogger log;
/** Local process descriptor. */
- private GridHadoopProcessDescriptor locProcDesc;
+ private HadoopProcessDescriptor locProcDesc;
/** Marshaller. */
private Marshaller marsh;
@@ -183,7 +183,7 @@ public class GridHadoopExternalCommunication {
private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
/** NIO server. */
- private GridNioServer<GridHadoopMessage> nioSrvr;
+ private GridNioServer<HadoopMessage> nioSrvr;
/** Shared memory server. */
private IpcSharedMemoryServerEndpoint shmemSrv;
@@ -234,7 +234,7 @@ public class GridHadoopExternalCommunication {
ExecutorService execSvc,
String gridName
) {
- locProcDesc = new GridHadoopProcessDescriptor(parentNodeId, procId);
+ locProcDesc = new HadoopProcessDescriptor(parentNodeId, procId);
this.marsh = marsh;
this.log = log.getLogger(GridHadoopExternalCommunication.class);
@@ -563,7 +563,7 @@ public class GridHadoopExternalCommunication {
*
* @return Local process descriptor.
*/
- public GridHadoopProcessDescriptor localProcessDescriptor() {
+ public HadoopProcessDescriptor localProcessDescriptor() {
return locProcDesc;
}
@@ -587,7 +587,7 @@ public class GridHadoopExternalCommunication {
* @return Server instance.
* @throws IgniteCheckedException Thrown if it's not possible to create server.
*/
- private GridNioServer<GridHadoopMessage> resetNioServer() throws IgniteCheckedException {
+ private GridNioServer<HadoopMessage> resetNioServer() throws IgniteCheckedException {
if (boundTcpPort >= 0)
throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort);
@@ -596,8 +596,8 @@ public class GridHadoopExternalCommunication {
// If configured TCP port is busy, find first available in range.
for (int port = locPort; port < locPort + locPortRange; port++) {
try {
- GridNioServer<GridHadoopMessage> srvr =
- GridNioServer.<GridHadoopMessage>builder()
+ GridNioServer<HadoopMessage> srvr =
+ GridNioServer.<HadoopMessage>builder()
.address(locHost)
.port(port)
.listener(srvLsnr)
@@ -722,7 +722,7 @@ public class GridHadoopExternalCommunication {
* @param msg
* @throws IgniteCheckedException
*/
- public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws
+ public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) throws
IgniteCheckedException {
assert desc != null;
assert msg != null;
@@ -761,7 +761,7 @@ public class GridHadoopExternalCommunication {
* @return The existing or just created client.
* @throws IgniteCheckedException Thrown if any exception occurs.
*/
- private GridHadoopCommunicationClient reserveClient(GridHadoopProcessDescriptor desc) throws IgniteCheckedException {
+ private GridHadoopCommunicationClient reserveClient(HadoopProcessDescriptor desc) throws IgniteCheckedException {
assert desc != null;
UUID procId = desc.processId();
@@ -806,7 +806,7 @@ public class GridHadoopExternalCommunication {
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- @Nullable protected GridHadoopCommunicationClient createNioClient(GridHadoopProcessDescriptor desc)
+ @Nullable protected GridHadoopCommunicationClient createNioClient(HadoopProcessDescriptor desc)
throws IgniteCheckedException {
assert desc != null;
@@ -837,7 +837,7 @@ public class GridHadoopExternalCommunication {
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- @Nullable protected GridHadoopCommunicationClient createShmemClient(GridHadoopProcessDescriptor desc, int port)
+ @Nullable protected GridHadoopCommunicationClient createShmemClient(HadoopProcessDescriptor desc, int port)
throws IgniteCheckedException {
int attempt = 1;
@@ -929,7 +929,7 @@ public class GridHadoopExternalCommunication {
* @return Client.
* @throws IgniteCheckedException If failed.
*/
- protected GridHadoopCommunicationClient createTcpClient(GridHadoopProcessDescriptor desc) throws IgniteCheckedException {
+ protected GridHadoopCommunicationClient createTcpClient(HadoopProcessDescriptor desc) throws IgniteCheckedException {
String addr = desc.address();
int port = desc.tcpPort();
@@ -1066,7 +1066,7 @@ public class GridHadoopExternalCommunication {
* @param desc Sender process descriptor.
* @param msg Communication message.
*/
- protected void notifyListener(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
+ protected void notifyListener(HadoopProcessDescriptor desc, HadoopMessage msg) {
GridHadoopMessageListener lsnr = this.lsnr;
if (lsnr != null)
@@ -1135,7 +1135,7 @@ public class GridHadoopExternalCommunication {
private final IpcEndpoint endpoint;
/** Adapter. */
- private GridHadoopIpcToNioAdapter<GridHadoopMessage> adapter;
+ private GridHadoopIpcToNioAdapter<HadoopMessage> adapter;
/**
* @param endpoint Endpoint.
@@ -1279,7 +1279,7 @@ public class GridHadoopExternalCommunication {
/** {@inheritDoc} */
@Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
- GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META);
+ HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
UUID rmtProcId = desc == null ? null : desc.processId();
@@ -1387,12 +1387,12 @@ public class GridHadoopExternalCommunication {
* Process ID message.
*/
@SuppressWarnings("PublicInnerClass")
- public static class ProcessHandshakeMessage implements GridHadoopMessage {
+ public static class ProcessHandshakeMessage implements HadoopMessage {
/** */
private static final long serialVersionUID = 0L;
/** Node ID. */
- private GridHadoopProcessDescriptor procDesc;
+ private HadoopProcessDescriptor procDesc;
/** */
public ProcessHandshakeMessage() {
@@ -1402,14 +1402,14 @@ public class GridHadoopExternalCommunication {
/**
* @param procDesc Process descriptor.
*/
- private ProcessHandshakeMessage(GridHadoopProcessDescriptor procDesc) {
+ private ProcessHandshakeMessage(HadoopProcessDescriptor procDesc) {
this.procDesc = procDesc;
}
/**
* @return Process ID.
*/
- public GridHadoopProcessDescriptor processDescriptor() {
+ public HadoopProcessDescriptor processDescriptor() {
return procDesc;
}
@@ -1420,7 +1420,7 @@ public class GridHadoopExternalCommunication {
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- procDesc = (GridHadoopProcessDescriptor)in.readObject();
+ procDesc = (HadoopProcessDescriptor)in.readObject();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java
index 2a25357..e9dfc92 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java
@@ -55,7 +55,7 @@ public class GridHadoopMarshallerFilter extends GridNioFilterAdapter {
/** {@inheritDoc} */
@Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
- assert msg instanceof GridHadoopMessage : "Invalid message type: " + msg;
+ assert msg instanceof HadoopMessage : "Invalid message type: " + msg;
return proceedSessionWrite(ses, marshaller.marshal(msg));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java
index 219f4db..6010a8d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java
@@ -28,12 +28,12 @@ public interface GridHadoopMessageListener {
* @param desc Process descriptor.
* @param msg Hadoop message.
*/
- public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg);
+ public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg);
/**
* Called when connection to remote process was lost.
*
* @param desc Process descriptor.
*/
- public void onConnectionLost(GridHadoopProcessDescriptor desc);
+ public void onConnectionLost(HadoopProcessDescriptor desc);
}