You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/17 07:30:28 UTC
[26/45] hadoop git commit: HDFS-7358. Clients may get stuck waiting
when using ByteArrayManager.
HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/394ba94c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/394ba94c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/394ba94c
Branch: refs/heads/HDFS-EC
Commit: 394ba94c5d2801fbc5d95c7872eeeede28eed1eb
Parents: aee68b6
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Nov 13 12:28:44 2014 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Nov 13 12:28:44 2014 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 61 +++++++++++++++-----
.../hadoop/hdfs/util/ByteArrayManager.java | 15 +++--
.../java/org/apache/hadoop/hdfs/TestHFlush.java | 10 ++--
.../hadoop/hdfs/util/TestByteArrayManager.java | 43 +++++++++-----
5 files changed, 89 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 858a71a..bd3dcd5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -421,6 +421,9 @@ Release 2.7.0 - UNRELEASED
HDFS-6938. Cleanup javac warnings in FSNamesystem (Charles Lamb via wheat9)
+ HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.
+ (szetszwo)
+
Release 2.6.0 - 2014-11-15
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 17942f2..51b1006 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -262,7 +262,9 @@ public class DFSOutputStream extends FSOutputSummer
maxChunks = chunksPerPkt;
}
- void writeData(byte[] inarray, int off, int len) {
+ synchronized void writeData(byte[] inarray, int off, int len)
+ throws ClosedChannelException {
+ checkBuffer();
if (dataPos + len > buf.length) {
throw new BufferOverflowException();
}
@@ -270,7 +272,9 @@ public class DFSOutputStream extends FSOutputSummer
dataPos += len;
}
- void writeChecksum(byte[] inarray, int off, int len) {
+ synchronized void writeChecksum(byte[] inarray, int off, int len)
+ throws ClosedChannelException {
+ checkBuffer();
if (len == 0) {
return;
}
@@ -284,7 +288,9 @@ public class DFSOutputStream extends FSOutputSummer
/**
* Write the full packet, including the header, to the given output stream.
*/
- void writeTo(DataOutputStream stm) throws IOException {
+ synchronized void writeTo(DataOutputStream stm) throws IOException {
+ checkBuffer();
+
final int dataLen = dataPos - dataStart;
final int checksumLen = checksumPos - checksumStart;
final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
@@ -326,7 +332,13 @@ public class DFSOutputStream extends FSOutputSummer
}
}
- private void releaseBuffer(ByteArrayManager bam) {
+ private synchronized void checkBuffer() throws ClosedChannelException {
+ if (buf == null) {
+ throw new ClosedChannelException();
+ }
+ }
+
+ private synchronized void releaseBuffer(ByteArrayManager bam) {
bam.release(buf);
buf = null;
}
@@ -712,7 +724,7 @@ public class DFSOutputStream extends FSOutputSummer
closeResponder(); // close and join
closeStream();
streamerClosed = true;
- closed = true;
+ setClosed();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
@@ -1616,8 +1628,9 @@ public class DFSOutputStream extends FSOutputSummer
return sock;
}
+ @Override
protected void checkClosed() throws IOException {
- if (closed) {
+ if (isClosed()) {
IOException e = lastException.get();
throw e != null ? e : new ClosedChannelException();
}
@@ -1827,7 +1840,7 @@ public class DFSOutputStream extends FSOutputSummer
synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space
- while (!closed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
+ while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
try {
dataQueue.wait();
} catch (InterruptedException e) {
@@ -2013,8 +2026,9 @@ public class DFSOutputStream extends FSOutputSummer
// So send an empty sync packet.
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
- } else {
+ } else if (currentPacket != null) {
// just discard the current packet since it is already been sent.
+ currentPacket.releaseBuffer(byteArrayManager);
currentPacket = null;
}
}
@@ -2071,7 +2085,7 @@ public class DFSOutputStream extends FSOutputSummer
} catch (IOException e) {
DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) {
- if (!closed) {
+ if (!isClosed()) {
lastException.set(new IOException("IOException flush:" + e));
closeThreads(true);
}
@@ -2133,7 +2147,7 @@ public class DFSOutputStream extends FSOutputSummer
long begin = Time.monotonicNow();
try {
synchronized (dataQueue) {
- while (!closed) {
+ while (!isClosed()) {
checkClosed();
if (lastAckedSeqno >= seqno) {
break;
@@ -2166,7 +2180,7 @@ public class DFSOutputStream extends FSOutputSummer
* resources associated with this stream.
*/
synchronized void abort() throws IOException {
- if (closed) {
+ if (isClosed()) {
return;
}
streamer.setLastException(new IOException("Lease timeout of "
@@ -2175,6 +2189,25 @@ public class DFSOutputStream extends FSOutputSummer
dfsClient.endFileLease(fileId);
}
+ boolean isClosed() {
+ return closed;
+ }
+
+ void setClosed() {
+ closed = true;
+ synchronized (dataQueue) {
+ releaseBuffer(dataQueue, byteArrayManager);
+ releaseBuffer(ackQueue, byteArrayManager);
+ }
+ }
+
+ private static void releaseBuffer(List<Packet> packets, ByteArrayManager bam) {
+ for(Packet p : packets) {
+ p.releaseBuffer(bam);
+ }
+ packets.clear();
+ }
+
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
private void closeThreads(boolean force) throws IOException {
@@ -2189,7 +2222,7 @@ public class DFSOutputStream extends FSOutputSummer
} finally {
streamer = null;
s = null;
- closed = true;
+ setClosed();
}
}
@@ -2199,7 +2232,7 @@ public class DFSOutputStream extends FSOutputSummer
*/
@Override
public synchronized void close() throws IOException {
- if (closed) {
+ if (isClosed()) {
IOException e = lastException.getAndSet(null);
if (e == null)
return;
@@ -2229,7 +2262,7 @@ public class DFSOutputStream extends FSOutputSummer
dfsClient.endFileLease(fileId);
} catch (ClosedChannelException e) {
} finally {
- closed = true;
+ setClosed();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
index 4751e72..ea5e39d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
@@ -200,12 +200,7 @@ public abstract class ByteArrayManager {
debugMessage.get().append(", ").append(this);
}
- if (numAllocated == maxAllocated) {
- if (LOG.isDebugEnabled()) {
- debugMessage.get().append(", notifyAll");
- }
- notifyAll();
- }
+ notify();
numAllocated--;
if (numAllocated < 0) {
// it is possible to drop below 0 since
@@ -346,12 +341,13 @@ public abstract class ByteArrayManager {
* the number of allocated arrays drops to below the capacity.
*
* The byte array allocated by this method must be returned for recycling
- * via the {@link ByteArrayManager#recycle(byte[])} method.
+ * via the {@link Impl#release(byte[])} method.
*
* @return a byte array with length larger than or equal to the given length.
*/
@Override
public byte[] newByteArray(final int arrayLength) throws InterruptedException {
+ Preconditions.checkArgument(arrayLength >= 0);
if (LOG.isDebugEnabled()) {
debugMessage.get().append("allocate(").append(arrayLength).append(")");
}
@@ -375,6 +371,7 @@ public abstract class ByteArrayManager {
}
if (LOG.isDebugEnabled()) {
+ debugMessage.get().append(", return byte[").append(array.length).append("]");
logDebugMessage();
}
return array;
@@ -384,7 +381,9 @@ public abstract class ByteArrayManager {
* Recycle the given byte array.
*
* The byte array may or may not be allocated
- * by the {@link ByteArrayManager#allocate(int)} method.
+ * by the {@link Impl#newByteArray(int)} method.
+ *
+ * This is a non-blocking call.
*/
@Override
public int release(final byte[] array) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
index 7b4d2bb..9ada95f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
@@ -340,11 +340,11 @@ public class TestHFlush {
// If we made it past the hflush(), then that means that the ack made it back
// from the pipeline before we got to the wait() call. In that case we should
// still have interrupted status.
- assertTrue(Thread.currentThread().interrupted());
+ assertTrue(Thread.interrupted());
} catch (InterruptedIOException ie) {
System.out.println("Got expected exception during flush");
}
- assertFalse(Thread.currentThread().interrupted());
+ assertFalse(Thread.interrupted());
// Try again to flush should succeed since we no longer have interrupt status
stm.hflush();
@@ -362,11 +362,11 @@ public class TestHFlush {
// If we made it past the close(), then that means that the ack made it back
// from the pipeline before we got to the wait() call. In that case we should
// still have interrupted status.
- assertTrue(Thread.currentThread().interrupted());
+ assertTrue(Thread.interrupted());
} catch (InterruptedIOException ioe) {
System.out.println("Got expected exception during close");
// If we got the exception, we shouldn't have interrupted status anymore.
- assertFalse(Thread.currentThread().interrupted());
+ assertFalse(Thread.interrupted());
// Now do a successful close.
stm.close();
@@ -374,7 +374,7 @@ public class TestHFlush {
// verify that entire file is good
- AppendTestUtil.checkFullFile(fs, p, fileLen,
+ AppendTestUtil.checkFullFile(fs, p, 4,
fileContents, "Failed to deal with thread interruptions");
} finally {
cluster.shutdown();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
index 289617a..77a68c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -141,7 +143,7 @@ public class TestByteArrayManager {
{ // recycle half of the arrays
for(int i = 0; i < countThreshold/2; i++) {
- recycler.submit(removeLast(allocator.futures));
+ recycler.submit(removeLast(allocator.futures).get());
}
for(Future<Integer> f : recycler.furtures) {
@@ -186,8 +188,8 @@ public class TestByteArrayManager {
}
// recycle an array
- recycler.submit(removeLast(allocator.futures));
- Assert.assertEquals(1, removeLast(recycler.furtures).intValue());
+ recycler.submit(removeLast(allocator.futures).get());
+ Assert.assertEquals(1, removeLast(recycler.furtures).get().intValue());
// check if the thread is unblocked
Thread.sleep(100);
@@ -207,11 +209,11 @@ public class TestByteArrayManager {
}
}
- static <T> T removeLast(List<Future<T>> furtures) throws Exception {
+ static <T> Future<T> removeLast(List<Future<T>> furtures) throws Exception {
return remove(furtures, furtures.size() - 1);
}
- static <T> T remove(List<Future<T>> furtures, int i) throws Exception {
- return furtures.isEmpty()? null: furtures.remove(i).get();
+ static <T> Future<T> remove(List<Future<T>> furtures, int i) throws Exception {
+ return furtures.isEmpty()? null: furtures.remove(i);
}
static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
@@ -320,12 +322,13 @@ public class TestByteArrayManager {
final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
final Thread[] threads = new Thread[runners.length];
- final int num = 1 << 8;
+ final int num = 1 << 10;
for(int i = 0; i < runners.length; i++) {
runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
threads[i] = runners[i].start(num);
}
+ final List<Exception> exceptions = new ArrayList<Exception>();
final Thread randomRecycler = new Thread() {
@Override
public void run() {
@@ -336,10 +339,11 @@ public class TestByteArrayManager {
runners[j].recycle();
} catch (Exception e) {
e.printStackTrace();
- Assert.fail(this + " has " + e);
+ exceptions.add(new Exception(this + " has an exception", e));
}
if ((i & 0xFF) == 0) {
+ LOG.info("randomRecycler sleep, i=" + i);
sleepMs(100);
}
}
@@ -361,6 +365,7 @@ public class TestByteArrayManager {
randomRecycler.start();
randomRecycler.join();
+ Assert.assertTrue(exceptions.isEmpty());
Assert.assertNull(counters.get(0, false));
for(int i = 1; i < runners.length; i++) {
@@ -392,7 +397,7 @@ public class TestByteArrayManager {
}
static class Runner implements Runnable {
- static final int NUM_RUNNERS = 4;
+ static final int NUM_RUNNERS = 5;
static int index2arrayLength(int index) {
return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
@@ -453,16 +458,22 @@ public class TestByteArrayManager {
return f;
}
- byte[] removeFirst() throws Exception {
+ Future<byte[]> removeFirst() throws Exception {
synchronized (arrays) {
return remove(arrays, 0);
}
}
void recycle() throws Exception {
- final byte[] a = removeFirst();
- if (a != null) {
- recycle(a);
+ final Future<byte[]> f = removeFirst();
+ if (f != null) {
+ printf("randomRecycler: ");
+ try {
+ recycle(f.get(10, TimeUnit.MILLISECONDS));
+ } catch(TimeoutException e) {
+ recycle(new byte[maxArrayLength]);
+ printf("timeout, new byte[%d]\n", maxArrayLength);
+ }
}
}
@@ -490,9 +501,9 @@ public class TestByteArrayManager {
submitAllocate();
} else {
try {
- final byte[] a = removeFirst();
- if (a != null) {
- submitRecycle(a);
+ final Future<byte[]> f = removeFirst();
+ if (f != null) {
+ submitRecycle(f.get());
}
} catch (Exception e) {
e.printStackTrace();