You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2013/10/11 02:03:28 UTC
svn commit: r1531153 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs:
CHANGES.txt src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
Author: suresh
Date: Fri Oct 11 00:03:28 2013
New Revision: 1531153
URL: http://svn.apache.org/r1531153
Log:
HDFS-5335. Merge 1531152 from trunk
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
- copied unchanged from r1531152, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1531153&r1=1531152&r2=1531153&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 11 00:03:28 2013
@@ -120,6 +120,9 @@ Release 2.2.1 - UNRELEASED
HDFS-5337. should do hsync for a commit request even there is no pending
writes (brandonli)
+ HDFS-5335. Hive query failed with possible race in dfs output stream.
+ (Haohui Mai via suresh)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1531153&r1=1531152&r2=1531153&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Oct 11 00:03:28 2013
@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CanSetDropBehind;
@@ -85,7 +86,6 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
-import org.mortbay.log.Log;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
@@ -141,7 +141,7 @@ public class DFSOutputStream extends FSO
private long bytesCurBlock = 0; // bytes writen in current block
private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0;
- private volatile IOException lastException = null;
+ private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
private long artificialSlowdown = 0;
private long lastFlushOffset = 0; // offset when flush was invoked
//persist blocks on namenode
@@ -814,8 +814,8 @@ public class DFSOutputStream extends FSO
if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
- lastException = new IOException("Failing write. Tried pipeline " +
- "recovery 5 times without success.");
+ lastException.set(new IOException("Failing write. Tried pipeline " +
+ "recovery 5 times without success."));
streamerClosed = true;
return false;
}
@@ -1005,8 +1005,8 @@ public class DFSOutputStream extends FSO
}
}
if (nodes.length <= 1) {
- lastException = new IOException("All datanodes " + pipelineMsg
- + " are bad. Aborting...");
+ lastException.set(new IOException("All datanodes " + pipelineMsg
+ + " are bad. Aborting..."));
streamerClosed = true;
return false;
}
@@ -1021,7 +1021,7 @@ public class DFSOutputStream extends FSO
newnodes.length-errorIndex);
nodes = newnodes;
hasError = false;
- lastException = null;
+ lastException.set(null);
errorIndex = -1;
}
@@ -1065,7 +1065,7 @@ public class DFSOutputStream extends FSO
ExtendedBlock oldBlock = block;
do {
hasError = false;
- lastException = null;
+ lastException.set(null);
errorIndex = -1;
success = false;
@@ -1279,9 +1279,7 @@ public class DFSOutputStream extends FSO
}
private void setLastException(IOException e) {
- if (lastException == null) {
- lastException = e;
- }
+ lastException.compareAndSet(null, e);
}
}
@@ -1313,7 +1311,7 @@ public class DFSOutputStream extends FSO
protected void checkClosed() throws IOException {
if (closed) {
- IOException e = lastException;
+ IOException e = lastException.get();
throw e != null ? e : new ClosedChannelException();
}
}
@@ -1469,6 +1467,7 @@ public class DFSOutputStream extends FSO
private void waitAndQueueCurrentPacket() throws IOException {
synchronized (dataQueue) {
+ try {
// If queue is full, then wait till we have enough space
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
try {
@@ -1487,6 +1486,8 @@ public class DFSOutputStream extends FSO
}
checkClosed();
queueCurrentPacket();
+ } catch (ClosedChannelException e) {
+ }
}
}
@@ -1735,7 +1736,7 @@ public class DFSOutputStream extends FSO
DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) {
if (!closed) {
- lastException = new IOException("IOException flush:" + e);
+ lastException.set(new IOException("IOException flush:" + e));
closeThreads(true);
}
}
@@ -1793,21 +1794,25 @@ public class DFSOutputStream extends FSO
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
}
- synchronized (dataQueue) {
- while (!closed) {
- checkClosed();
- if (lastAckedSeqno >= seqno) {
- break;
- }
- try {
- dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
- } catch (InterruptedException ie) {
- throw new InterruptedIOException(
- "Interrupted while waiting for data to be acknowledged by pipeline");
+ try {
+ synchronized (dataQueue) {
+ while (!closed) {
+ checkClosed();
+ if (lastAckedSeqno >= seqno) {
+ break;
+ }
+ try {
+ dataQueue.wait(1000); // when we receive an ack, we notify on
+ // dataQueue
+ } catch (InterruptedException ie) {
+ throw new InterruptedIOException(
+ "Interrupted while waiting for data to be acknowledged by pipeline");
+ }
}
}
+ checkClosed();
+ } catch (ClosedChannelException e) {
}
- checkClosed();
}
private synchronized void start() {
@@ -1853,7 +1858,7 @@ public class DFSOutputStream extends FSO
@Override
public synchronized void close() throws IOException {
if (closed) {
- IOException e = lastException;
+ IOException e = lastException.getAndSet(null);
if (e == null)
return;
else
@@ -1880,6 +1885,7 @@ public class DFSOutputStream extends FSO
closeThreads(false);
completeFile(lastBlock);
dfsClient.endFileLease(src);
+ } catch (ClosedChannelException e) {
} finally {
closed = true;
}