You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2013/04/25 02:55:01 UTC
svn commit: r1471797 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: CHANGES.txt
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
Author: acmurthy
Date: Thu Apr 25 00:55:00 2013
New Revision: 1471797
URL: http://svn.apache.org/r1471797
Log:
Merge -c 1471796 from trunk to branch-2 to fix MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient synchronization on updates to task Counters. Contributed by Sandy Ryza.
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1471797&r1=1471796&r2=1471797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Apr 25 00:55:00 2013
@@ -188,6 +188,9 @@ Release 2.0.5-beta - UNRELEASED
can override Mapper.run and Reducer.run to get the old (inconsistent)
behaviour. (acmurthy)
+ MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient
+ synchronization on updates to task Counters. (Sandy Ryza via acmurthy)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1471797&r1=1471796&r2=1471797&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Apr 25 00:55:00 2013
@@ -18,6 +18,10 @@
package org.apache.hadoop.mapred;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -86,8 +90,6 @@ public class LocalJobRunner implements C
private static final String jobDir = "localRunner/";
- private static final Counters EMPTY_COUNTERS = new Counters();
-
public long getProtocolVersion(String protocol, long clientVersion) {
return ClientProtocol.versionID;
}
@@ -273,10 +275,10 @@ public class LocalJobRunner implements C
this.partialMapProgress = new float[numMaps];
this.mapCounters = new Counters[numMaps];
for (int i = 0; i < numMaps; i++) {
- this.mapCounters[i] = EMPTY_COUNTERS;
+ this.mapCounters[i] = new Counters();
}
- this.reduceCounters = EMPTY_COUNTERS;
+ this.reduceCounters = new Counters();
}
/**
@@ -497,6 +499,15 @@ public class LocalJobRunner implements C
public synchronized boolean statusUpdate(TaskAttemptID taskId,
TaskStatus taskStatus) throws IOException, InterruptedException {
+ // Serialize as we would if distributed in order to make deep copy
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ taskStatus.write(dos);
+ dos.close();
+ taskStatus = TaskStatus.createTaskStatus(taskStatus.getIsMap());
+ taskStatus.readFields(new DataInputStream(
+ new ByteArrayInputStream(baos.toByteArray())));
+
LOG.info(taskStatus.getStateString());
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
@@ -525,10 +536,10 @@ public class LocalJobRunner implements C
public synchronized Counters getCurrentCounters() {
if (null == mapCounters) {
// Counters not yet initialized for job.
- return EMPTY_COUNTERS;
+ return new Counters();
}
- Counters current = EMPTY_COUNTERS;
+ Counters current = new Counters();
for (Counters c : mapCounters) {
current = Counters.sum(current, c);
}