You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/10/24 17:47:01 UTC
svn commit: r1401741 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Author: jlowe
Date: Wed Oct 24 15:47:01 2012
New Revision: 1401741
URL: http://svn.apache.org/viewvc?rev=1401741&view=rev
Log:
svn merge -c 1401738 FIXES: MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. Contributed by Vinod Kumar Vavilapalli
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1401741&r1=1401740&r2=1401741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Oct 24 15:47:01 2012
@@ -460,6 +460,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn)
+ MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
+ (Vinod Kumar Vavilapalli via jlowe)
+
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1401741&r1=1401740&r2=1401741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Wed Oct 24 15:47:01 2012
@@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -81,6 +82,7 @@ public class ContainerLauncherImpl exten
protected BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>();
YarnRPC rpc;
+ private final AtomicBoolean stopped;
private Container getContainer(ContainerLauncherEvent event) {
ContainerId id = event.getContainerID();
@@ -237,6 +239,7 @@ public class ContainerLauncherImpl exten
public ContainerLauncherImpl(AppContext context) {
super(ContainerLauncherImpl.class.getName());
this.context = context;
+ this.stopped = new AtomicBoolean(false);
}
@Override
@@ -271,11 +274,13 @@ public class ContainerLauncherImpl exten
@Override
public void run() {
ContainerLauncherEvent event = null;
- while (!Thread.currentThread().isInterrupted()) {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
+ if (!stopped.get()) {
+ LOG.error("Returning, interrupted : " + e);
+ }
return;
}
int poolSize = launcherPool.getCorePoolSize();
@@ -324,6 +329,10 @@ public class ContainerLauncherImpl exten
}
public void stop() {
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
// shutdown any containers that might be left running
shutdownAllContainers();
eventHandlingThread.interrupt();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1401741&r1=1401740&r2=1401741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Wed Oct 24 15:47:01 2012
@@ -67,7 +67,7 @@ public abstract class RMCommunicator ext
private int rmPollInterval;//millis
protected ApplicationId applicationId;
protected ApplicationAttemptId applicationAttemptId;
- private AtomicBoolean stopped;
+ private final AtomicBoolean stopped;
protected Thread allocatorThread;
@SuppressWarnings("rawtypes")
protected EventHandler eventHandler;
@@ -239,7 +239,9 @@ public abstract class RMCommunicator ext
// TODO: for other exceptions
}
} catch (InterruptedException e) {
- LOG.warn("Allocated thread interrupted. Returning.");
+ if (!stopped.get()) {
+ LOG.warn("Allocated thread interrupted. Returning.");
+ }
return;
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1401741&r1=1401740&r2=1401741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Wed Oct 24 15:47:01 2012
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -84,7 +85,7 @@ public class RMContainerAllocator extend
private static final Priority PRIORITY_MAP;
private Thread eventHandlingThread;
- private volatile boolean stopEventHandling;
+ private final AtomicBoolean stopped;
static {
PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
@@ -145,6 +146,7 @@ public class RMContainerAllocator extend
public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context);
+ this.stopped = new AtomicBoolean(false);
}
@Override
@@ -176,11 +178,13 @@ public class RMContainerAllocator extend
ContainerAllocatorEvent event;
- while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
+ if (!stopped.get()) {
+ LOG.error("Returning, interrupted : " + e);
+ }
return;
}
@@ -234,7 +238,10 @@ public class RMContainerAllocator extend
@Override
public void stop() {
- this.stopEventHandling = true;
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
eventHandlingThread.interrupt();
super.stop();
LOG.info("Final Stats: " + getStat());
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java?rev=1401741&r1=1401740&r2=1401741&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java Wed Oct 24 15:47:01 2012
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,10 +44,12 @@ public class TaskCleanerImpl extends Abs
private Thread eventHandlingThread;
private BlockingQueue<TaskCleanupEvent> eventQueue =
new LinkedBlockingQueue<TaskCleanupEvent>();
+ private final AtomicBoolean stopped;
public TaskCleanerImpl(AppContext context) {
super("TaskCleaner");
this.context = context;
+ this.stopped = new AtomicBoolean(false);
}
public void start() {
@@ -59,11 +62,13 @@ public class TaskCleanerImpl extends Abs
@Override
public void run() {
TaskCleanupEvent event = null;
- while (!Thread.currentThread().isInterrupted()) {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
+ if (!stopped.get()) {
+ LOG.error("Returning, interrupted : " + e);
+ }
return;
}
// the events from the queue are handled in parallel
@@ -77,6 +82,10 @@ public class TaskCleanerImpl extends Abs
}
public void stop() {
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
eventHandlingThread.interrupt();
launcherPool.shutdown();
super.stop();