You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:54:04 UTC
[72/92] [abbrv] ignite git commit: WIP.
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopPrepareForJobRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopPrepareForJobRequest.java
deleted file mode 100644
index 4aa1596..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopPrepareForJobRequest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Child process initialization request.
- */
-public class HadoopPrepareForJobRequest implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- @GridToStringInclude
- private HadoopJobId jobId;
-
- /** Job info. */
- @GridToStringInclude
- private HadoopJobInfo jobInfo;
-
- /** Total amount of reducers in the job. */
- @GridToStringInclude
- private int totalReducersCnt;
-
- /** Reducers to be executed on current node. */
- @GridToStringInclude
- private int[] locReducers;
-
- /**
- * Constructor required by {@link Externalizable}.
- */
- public HadoopPrepareForJobRequest() {
- // No-op.
- }
-
- /**
- * @param jobId Job ID.
- * @param jobInfo Job info.
- * @param totalReducersCnt Number of reducers in the job.
- * @param locReducers Reducers to be executed on current node.
- */
- public HadoopPrepareForJobRequest(HadoopJobId jobId, HadoopJobInfo jobInfo, int totalReducersCnt,
- int[] locReducers) {
- assert jobId != null;
-
- this.jobId = jobId;
- this.jobInfo = jobInfo;
- this.totalReducersCnt = totalReducersCnt;
- this.locReducers = locReducers;
- }
-
- /**
- * @return Job info.
- */
- public HadoopJobInfo jobInfo() {
- return jobInfo;
- }
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @return Reducers to be executed on current node.
- */
- public int[] localReducers() {
- return locReducers;
- }
-
- /**
- * @return Number of reducers in job.
- */
- public int totalReducerCount() {
- return totalReducersCnt;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
-
- out.writeObject(jobInfo);
- out.writeInt(totalReducersCnt);
-
- U.writeIntArray(out, locReducers);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new HadoopJobId();
- jobId.readExternal(in);
-
- jobInfo = (HadoopJobInfo)in.readObject();
- totalReducersCnt = in.readInt();
-
- locReducers = U.readIntArray(in);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopPrepareForJobRequest.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessDescriptor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessDescriptor.java
deleted file mode 100644
index 388c7b4..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessDescriptor.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.io.Serializable;
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Process descriptor used to identify process for which task is running.
- */
-public class HadoopProcessDescriptor implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Parent node ID. */
- private UUID parentNodeId;
-
- /** Process ID. */
- private UUID procId;
-
- /** Address. */
- private String addr;
-
- /** TCP port. */
- private int tcpPort;
-
- /** Shared memory port. */
- private int shmemPort;
-
- /**
- * @param parentNodeId Parent node ID.
- * @param procId Process ID.
- */
- public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) {
- this.parentNodeId = parentNodeId;
- this.procId = procId;
- }
-
- /**
- * Gets process ID.
- *
- * @return Process ID.
- */
- public UUID processId() {
- return procId;
- }
-
- /**
- * Gets parent node ID.
- *
- * @return Parent node ID.
- */
- public UUID parentNodeId() {
- return parentNodeId;
- }
-
- /**
- * Gets host address.
- *
- * @return Host address.
- */
- public String address() {
- return addr;
- }
-
- /**
- * Sets host address.
- *
- * @param addr Host address.
- */
- public void address(String addr) {
- this.addr = addr;
- }
-
- /**
- * @return Shared memory port.
- */
- public int sharedMemoryPort() {
- return shmemPort;
- }
-
- /**
- * Sets shared memory port.
- *
- * @param shmemPort Shared memory port.
- */
- public void sharedMemoryPort(int shmemPort) {
- this.shmemPort = shmemPort;
- }
-
- /**
- * @return TCP port.
- */
- public int tcpPort() {
- return tcpPort;
- }
-
- /**
- * Sets TCP port.
- *
- * @param tcpPort TCP port.
- */
- public void tcpPort(int tcpPort) {
- this.tcpPort = tcpPort;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (!(o instanceof HadoopProcessDescriptor))
- return false;
-
- HadoopProcessDescriptor that = (HadoopProcessDescriptor)o;
-
- return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int result = parentNodeId.hashCode();
-
- result = 31 * result + procId.hashCode();
-
- return result;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopProcessDescriptor.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessStartedAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessStartedAck.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessStartedAck.java
deleted file mode 100644
index 81c062e..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopProcessStartedAck.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Process started message.
- */
-public class HadoopProcessStartedAck implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopProcessStartedAck.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskExecutionRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskExecutionRequest.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskExecutionRequest.java
deleted file mode 100644
index f0a8139..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskExecutionRequest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Message sent from node to child process to start task(s) execution.
- */
-public class HadoopTaskExecutionRequest implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Job ID. */
- @GridToStringInclude
- private HadoopJobId jobId;
-
- /** Job info. */
- @GridToStringInclude
- private HadoopJobInfo jobInfo;
-
- /** Mappers. */
- @GridToStringInclude
- private Collection<HadoopTaskInfo> tasks;
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @param jobId Job ID.
- */
- public void jobId(HadoopJobId jobId) {
- this.jobId = jobId;
- }
-
- /**
- * @return Jon info.
- */
- public HadoopJobInfo jobInfo() {
- return jobInfo;
- }
-
- /**
- * @param jobInfo Job info.
- */
- public void jobInfo(HadoopJobInfo jobInfo) {
- this.jobInfo = jobInfo;
- }
-
- /**
- * @return Tasks.
- */
- public Collection<HadoopTaskInfo> tasks() {
- return tasks;
- }
-
- /**
- * @param tasks Tasks.
- */
- public void tasks(Collection<HadoopTaskInfo> tasks) {
- this.tasks = tasks;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopTaskExecutionRequest.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
-
- out.writeObject(jobInfo);
- U.writeCollection(out, tasks);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new HadoopJobId();
- jobId.readExternal(in);
-
- jobInfo = (HadoopJobInfo)in.readObject();
- tasks = U.readCollection(in);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskFinishedMessage.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskFinishedMessage.java
deleted file mode 100644
index a4a2a8d..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopTaskFinishedMessage.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskStatus;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Task finished message. Sent when local task finishes execution.
- */
-public class HadoopTaskFinishedMessage implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Finished task info. */
- private HadoopTaskInfo taskInfo;
-
- /** Task finish status. */
- private HadoopTaskStatus status;
-
- /**
- * Constructor required by {@link Externalizable}.
- */
- public HadoopTaskFinishedMessage() {
- // No-op.
- }
-
- /**
- * @param taskInfo Finished task info.
- * @param status Task finish status.
- */
- public HadoopTaskFinishedMessage(HadoopTaskInfo taskInfo, HadoopTaskStatus status) {
- assert taskInfo != null;
- assert status != null;
-
- this.taskInfo = taskInfo;
- this.status = status;
- }
-
- /**
- * @return Finished task info.
- */
- public HadoopTaskInfo taskInfo() {
- return taskInfo;
- }
-
- /**
- * @return Task finish status.
- */
- public HadoopTaskStatus status() {
- return status;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopTaskFinishedMessage.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- taskInfo.writeExternal(out);
- status.writeExternal(out);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- taskInfo = new HadoopTaskInfo();
- taskInfo.readExternal(in);
-
- status = new HadoopTaskStatus();
- status.readExternal(in);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopChildProcessRunner.java
deleted file mode 100644
index 96c3ec6..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.child;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.HadoopShuffleAck;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.HadoopShuffleJob;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.HadoopShuffleMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopExecutorService;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopRunnableTask;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskState;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.HadoopTaskStatus;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopJobInfoUpdateRequest;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopPrepareForJobRequest;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessStartedAck;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopTaskExecutionRequest;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopTaskFinishedMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopExternalCommunication;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopMessageListener;
-import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.REDUCE;
-
-/**
- * Hadoop process base.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public class HadoopChildProcessRunner {
- /** Node process descriptor. */
- private HadoopProcessDescriptor nodeDesc;
-
- /** Message processing executor service. */
- private ExecutorService msgExecSvc;
-
- /** Task executor service. */
- private HadoopExecutorService execSvc;
-
- /** */
- protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
- /** External communication. */
- private HadoopExternalCommunication comm;
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Init guard. */
- private final AtomicBoolean initGuard = new AtomicBoolean();
-
- /** Start time. */
- private long startTime;
-
- /** Init future. */
- private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>();
-
- /** Job instance. */
- private HadoopJob job;
-
- /** Number of uncompleted tasks. */
- private final AtomicInteger pendingTasks = new AtomicInteger();
-
- /** Shuffle job. */
- private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob;
-
- /** Concurrent mappers. */
- private int concMappers;
-
- /** Concurrent reducers. */
- private int concReducers;
-
- /**
- * Starts child process runner.
- */
- public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc,
- ExecutorService msgExecSvc, IgniteLogger parentLog)
- throws IgniteCheckedException {
- this.comm = comm;
- this.nodeDesc = nodeDesc;
- this.msgExecSvc = msgExecSvc;
-
- comm.setListener(new MessageListener());
- log = parentLog.getLogger(HadoopChildProcessRunner.class);
-
- startTime = U.currentTimeMillis();
-
- // At this point node knows that this process has started.
- comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck());
- }
-
- /**
- * Initializes process for task execution.
- *
- * @param req Initialization request.
- */
- private void prepareProcess(HadoopPrepareForJobRequest req) {
- if (initGuard.compareAndSet(false, true)) {
- try {
- if (log.isDebugEnabled())
- log.debug("Initializing external hadoop task: " + req);
-
- assert job == null;
-
- job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null, new HadoopHelperImpl());
-
- job.initialize(true, nodeDesc.processId());
-
- shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
- req.totalReducerCount(), req.localReducers());
-
- initializeExecutors(req);
-
- if (log.isDebugEnabled())
- log.debug("External process initialized [initWaitTime=" +
- (U.currentTimeMillis() - startTime) + ']');
-
- initFut.onDone();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to initialize process: " + req, e);
-
- initFut.onDone(e);
- }
- }
- else
- log.warning("Duplicate initialize process request received (will ignore): " + req);
- }
-
- /**
- * @param req Task execution request.
- */
- private void runTasks(final HadoopTaskExecutionRequest req) {
- if (!initFut.isDone() && log.isDebugEnabled())
- log.debug("Will wait for process initialization future completion: " + req);
-
- initFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- try {
- // Make sure init was successful.
- f.get();
-
- boolean set = pendingTasks.compareAndSet(0, req.tasks().size());
-
- assert set;
-
- HadoopTaskInfo info = F.first(req.tasks());
-
- assert info != null;
-
- int size = info.type() == MAP ? concMappers : concReducers;
-
-// execSvc.setCorePoolSize(size);
-// execSvc.setMaximumPoolSize(size);
-
- if (log.isDebugEnabled())
- log.debug("Set executor service size for task type [type=" + info.type() +
- ", size=" + size + ']');
-
- for (HadoopTaskInfo taskInfo : req.tasks()) {
- if (log.isDebugEnabled())
- log.debug("Submitted task for external execution: " + taskInfo);
-
- execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) {
- @Override protected void onTaskFinished(HadoopTaskStatus status) {
- onTaskFinished0(this, status);
- }
-
- @Override protected HadoopTaskInput createInput(HadoopTaskContext ctx)
- throws IgniteCheckedException {
- return shuffleJob.input(ctx);
- }
-
- @Override protected HadoopTaskOutput createOutput(HadoopTaskContext ctx)
- throws IgniteCheckedException {
- return shuffleJob.output(ctx);
- }
- });
- }
- }
- catch (IgniteCheckedException e) {
- for (HadoopTaskInfo info : req.tasks())
- notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
- }
- }
- });
- }
-
- /**
- * Creates executor services.
- *
- * @param req Init child process request.
- */
- private void initializeExecutors(HadoopPrepareForJobRequest req) {
- int cpus = Runtime.getRuntime().availableProcessors();
-//
-// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
-// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus);
-
- execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024);
- }
-
- /**
- * Updates external process map so that shuffle can proceed with sending messages to reducers.
- *
- * @param req Update request.
- */
- private void updateTasks(final HadoopJobInfoUpdateRequest req) {
- initFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> gridFut) {
- assert initGuard.get();
-
- assert req.jobId().equals(job.id());
-
- if (req.reducersAddresses() != null) {
- if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
- shuffleJob.startSending("external",
- new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
- @Override public void applyx(HadoopProcessDescriptor dest,
- HadoopShuffleMessage msg) throws IgniteCheckedException {
- comm.sendMessage(dest, msg);
- }
- });
- }
- }
- }
- });
- }
-
- /**
- * Stops all executors and running tasks.
- */
- private void shutdown() {
- if (execSvc != null)
- execSvc.shutdown(5000);
-
- if (msgExecSvc != null)
- msgExecSvc.shutdownNow();
-
- try {
- job.dispose(true);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to dispose job.", e);
- }
- }
-
- /**
- * Notifies node about task finish.
- *
- * @param run Finished task runnable.
- * @param status Task status.
- */
- private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) {
- HadoopTaskInfo info = run.taskInfo();
-
- int pendingTasks0 = pendingTasks.decrementAndGet();
-
- if (log.isDebugEnabled())
- log.debug("Hadoop task execution finished [info=" + info
- + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() +
- ", pendingTasks=" + pendingTasks0 +
- ", err=" + status.failCause() + ']');
-
- assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported.";
-
- boolean flush = pendingTasks0 == 0 && info.type() == MAP;
-
- notifyTaskFinished(info, status, flush);
- }
-
- /**
- * @param taskInfo Finished task info.
- * @param status Task status.
- */
- private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final HadoopTaskStatus status,
- boolean flush) {
-
- final HadoopTaskState state = status.state();
- final Throwable err = status.failCause();
-
- if (!flush) {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state +
- ", err=" + err + ']');
-
- comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status));
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to send message to parent node (will terminate child process).", e);
-
- shutdown();
-
- terminate();
- }
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" +
- taskInfo + ", state=" + state + ", err=" + err + ']');
-
- final long start = U.currentTimeMillis();
-
- try {
- shuffleJob.flush().listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- long end = U.currentTimeMillis();
-
- if (log.isDebugEnabled())
- log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo +
- ", flushTime=" + (end - start) + ']');
-
- try {
- // Check for errors on shuffle.
- f.get();
-
- notifyTaskFinished(taskInfo, status, false);
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
- ", state=" + state + ", err=" + err + ']', e);
-
- notifyTaskFinished(taskInfo,
- new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
- }
- }
- });
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
- ", state=" + state + ", err=" + err + ']', e);
-
- notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
- }
- }
- }
-
- /**
- * Checks if message was received from parent node and prints warning if not.
- *
- * @param desc Sender process ID.
- * @param msg Received message.
- * @return {@code True} if received from parent node.
- */
- private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) {
- if (!nodeDesc.processId().equals(desc.processId())) {
- log.warning("Received process control request from unknown process (will ignore) [desc=" + desc +
- ", msg=" + msg + ']');
-
- return false;
- }
-
- return true;
- }
-
- /**
- * Stops execution of this process.
- */
- private void terminate() {
- System.exit(1);
- }
-
- /**
- * Message listener.
- */
- private class MessageListener implements HadoopMessageListener {
- /** {@inheritDoc} */
- @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) {
- if (msg instanceof HadoopTaskExecutionRequest) {
- if (validateNodeMessage(desc, msg))
- runTasks((HadoopTaskExecutionRequest)msg);
- }
- else if (msg instanceof HadoopJobInfoUpdateRequest) {
- if (validateNodeMessage(desc, msg))
- updateTasks((HadoopJobInfoUpdateRequest)msg);
- }
- else if (msg instanceof HadoopPrepareForJobRequest) {
- if (validateNodeMessage(desc, msg))
- prepareProcess((HadoopPrepareForJobRequest)msg);
- }
- else if (msg instanceof HadoopShuffleMessage) {
- if (log.isTraceEnabled())
- log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
-
- initFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- try {
- HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
-
- shuffleJob.onShuffleMessage(m);
-
- comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
- }
- }
- });
- }
- else if (msg instanceof HadoopShuffleAck) {
- if (log.isTraceEnabled())
- log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
-
- shuffleJob.onShuffleAck((HadoopShuffleAck)msg);
- }
- else
- log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
- }
-
- /** {@inheritDoc} */
- @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
- if (log.isDebugEnabled())
- log.debug("Lost connection with remote process: " + desc);
-
- if (desc == null)
- U.warn(log, "Handshake failed.");
- else if (desc.processId().equals(nodeDesc.processId())) {
- log.warning("Child process lost connection with parent node (will terminate child process).");
-
- shutdown();
-
- terminate();
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopExternalProcessStarter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopExternalProcessStarter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopExternalProcessStarter.java
deleted file mode 100644
index a2345b0..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/child/HadoopExternalProcessStarter.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.child;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.net.URL;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication.HadoopExternalCommunication;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.logger.log4j.Log4JLogger;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
-
-/**
- * Hadoop external process base class.
- */
-public class HadoopExternalProcessStarter {
- /** Path to Log4j configuration file. */
- public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml";
-
- /** Arguments. */
- private Args args;
-
- /** System out. */
- private OutputStream out;
-
- /** System err. */
- private OutputStream err;
-
- /**
- * @param args Parsed arguments.
- */
- public HadoopExternalProcessStarter(Args args) {
- this.args = args;
- }
-
- /**
- * @param cmdArgs Process arguments.
- */
- public static void main(String[] cmdArgs) {
- try {
- Args args = arguments(cmdArgs);
-
- new HadoopExternalProcessStarter(args).run();
- }
- catch (Exception e) {
- System.err.println("Failed");
-
- System.err.println(e.getMessage());
-
- e.printStackTrace(System.err);
- }
- }
-
- /**
- *
- * @throws Exception
- */
- public void run() throws Exception {
- U.setWorkDirectory(args.workDir, U.getIgniteHome());
-
- File outputDir = outputDirectory();
-
- initializeStreams(outputDir);
-
- ExecutorService msgExecSvc = Executors.newFixedThreadPool(
- Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2));
-
- IgniteLogger log = logger(outputDir);
-
- HadoopExternalCommunication comm = new HadoopExternalCommunication(
- args.nodeId,
- args.childProcId,
- new JdkMarshaller(),
- log,
- msgExecSvc,
- "external"
- );
-
- comm.start();
-
- HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId);
- nodeDesc.address(args.addr);
- nodeDesc.tcpPort(args.tcpPort);
- nodeDesc.sharedMemoryPort(args.shmemPort);
-
- HadoopChildProcessRunner runner = new HadoopChildProcessRunner();
-
- runner.start(comm, nodeDesc, msgExecSvc, log);
-
- System.err.println("Started");
- System.err.flush();
-
- System.setOut(new PrintStream(out));
- System.setErr(new PrintStream(err));
- }
-
- /**
- * @param outputDir Directory for process output.
- * @throws Exception
- */
- private void initializeStreams(File outputDir) throws Exception {
- out = new FileOutputStream(new File(outputDir, args.childProcId + ".out"));
- err = new FileOutputStream(new File(outputDir, args.childProcId + ".err"));
- }
-
- /**
- * @return Path to output directory.
- * @throws IOException If failed.
- */
- private File outputDirectory() throws IOException {
- File f = new File(args.out);
-
- if (!f.exists()) {
- if (!f.mkdirs())
- throw new IOException("Failed to create output directory: " + args.out);
- }
- else {
- if (f.isFile())
- throw new IOException("Output directory is a file: " + args.out);
- }
-
- return f;
- }
-
- /**
- * @param outputDir Directory for process output.
- * @return Logger.
- */
- private IgniteLogger logger(final File outputDir) {
- final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG);
-
- Log4JLogger logger;
-
- try {
- logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true);
- }
- catch (IgniteCheckedException e) {
- System.err.println("Failed to create URL-based logger. Will use default one.");
-
- e.printStackTrace();
-
- logger = new Log4JLogger(true);
- }
-
- logger.updateFilePath(new IgniteClosure<String, String>() {
- @Override public String apply(String s) {
- return new File(outputDir, args.childProcId + ".log").getAbsolutePath();
- }
- });
-
- return logger;
- }
-
- /**
- * @param processArgs Process arguments.
- * @return Child process instance.
- */
- private static Args arguments(String[] processArgs) throws Exception {
- Args args = new Args();
-
- for (int i = 0; i < processArgs.length; i++) {
- String arg = processArgs[i];
-
- switch (arg) {
- case "-cpid": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing process ID for '-cpid' parameter");
-
- String procIdStr = processArgs[++i];
-
- args.childProcId = UUID.fromString(procIdStr);
-
- break;
- }
-
- case "-ppid": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing process ID for '-ppid' parameter");
-
- String procIdStr = processArgs[++i];
-
- args.parentProcId = UUID.fromString(procIdStr);
-
- break;
- }
-
- case "-nid": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing node ID for '-nid' parameter");
-
- String nodeIdStr = processArgs[++i];
-
- args.nodeId = UUID.fromString(nodeIdStr);
-
- break;
- }
-
- case "-addr": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing node address for '-addr' parameter");
-
- args.addr = processArgs[++i];
-
- break;
- }
-
- case "-tport": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing tcp port for '-tport' parameter");
-
- args.tcpPort = Integer.parseInt(processArgs[++i]);
-
- break;
- }
-
- case "-sport": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing shared memory port for '-sport' parameter");
-
- args.shmemPort = Integer.parseInt(processArgs[++i]);
-
- break;
- }
-
- case "-out": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing output folder name for '-out' parameter");
-
- args.out = processArgs[++i];
-
- break;
- }
-
- case "-wd": {
- if (i == processArgs.length - 1)
- throw new Exception("Missing work folder name for '-wd' parameter");
-
- args.workDir = processArgs[++i];
-
- break;
- }
- }
- }
-
- return args;
- }
-
- /**
- * Execution arguments.
- */
- private static class Args {
- /** Process ID. */
- private UUID childProcId;
-
- /** Process ID. */
- private UUID parentProcId;
-
- /** Process ID. */
- private UUID nodeId;
-
- /** Node address. */
- private String addr;
-
- /** TCP port */
- private int tcpPort;
-
- /** Shmem port. */
- private int shmemPort = -1;
-
- /** Output folder. */
- private String out;
-
- /** Work directory. */
- private String workDir;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
deleted file mode 100644
index b421267..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopAbstractCommunicationClient.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Implements basic lifecycle for communication clients.
- */
-public abstract class HadoopAbstractCommunicationClient implements HadoopCommunicationClient {
- /** Time when this client was last used. */
- private volatile long lastUsed = U.currentTimeMillis();
-
- /** Reservations. */
- private final AtomicInteger reserves = new AtomicInteger();
-
- /** {@inheritDoc} */
- @Override public boolean close() {
- return reserves.compareAndSet(0, -1);
- }
-
- /** {@inheritDoc} */
- @Override public void forceClose() {
- reserves.set(-1);
- }
-
- /** {@inheritDoc} */
- @Override public boolean closed() {
- return reserves.get() == -1;
- }
-
- /** {@inheritDoc} */
- @Override public boolean reserve() {
- while (true) {
- int r = reserves.get();
-
- if (r == -1)
- return false;
-
- if (reserves.compareAndSet(r, r + 1))
- return true;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void release() {
- while (true) {
- int r = reserves.get();
-
- if (r == -1)
- return;
-
- if (reserves.compareAndSet(r, r - 1))
- return;
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean reserved() {
- return reserves.get() > 0;
- }
-
- /** {@inheritDoc} */
- @Override public long getIdleTime() {
- return U.currentTimeMillis() - lastUsed;
- }
-
- /**
- * Updates used time.
- */
- protected void markUsed() {
- lastUsed = U.currentTimeMillis();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopAbstractCommunicationClient.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/facedf50/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopCommunicationClient.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopCommunicationClient.java
deleted file mode 100644
index 623c260..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopCommunicationClient.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.HadoopProcessDescriptor;
-
-/**
- *
- */
-public interface HadoopCommunicationClient {
- /**
- * @return {@code True} if client has been closed by this call,
- * {@code false} if failed to close client (due to concurrent reservation or concurrent close).
- */
- public boolean close();
-
- /**
- * Forces client close.
- */
- public void forceClose();
-
- /**
- * @return {@code True} if client is closed;
- */
- public boolean closed();
-
- /**
- * @return {@code True} if client was reserved, {@code false} otherwise.
- */
- public boolean reserve();
-
- /**
- * Releases this client by decreasing reservations.
- */
- public void release();
-
- /**
- * @return {@code True} if client was reserved.
- */
- public boolean reserved();
-
- /**
- * Gets idle time of this client.
- *
- * @return Idle time of this client.
- */
- public long getIdleTime();
-
- /**
- * @param desc Process descriptor.
- * @param msg Message to send.
- * @throws IgniteCheckedException If failed.
- */
- public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) throws IgniteCheckedException;
-}
\ No newline at end of file