You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/17 07:30:28 UTC

[26/45] hadoop git commit: HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.

HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/394ba94c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/394ba94c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/394ba94c

Branch: refs/heads/HDFS-EC
Commit: 394ba94c5d2801fbc5d95c7872eeeede28eed1eb
Parents: aee68b6
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Nov 13 12:28:44 2014 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Nov 13 12:28:44 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 61 +++++++++++++++-----
 .../hadoop/hdfs/util/ByteArrayManager.java      | 15 +++--
 .../java/org/apache/hadoop/hdfs/TestHFlush.java | 10 ++--
 .../hadoop/hdfs/util/TestByteArrayManager.java  | 43 +++++++++-----
 5 files changed, 89 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 858a71a..bd3dcd5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -421,6 +421,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-6938. Cleanup javac warnings in FSNamesystem (Charles Lamb via wheat9)
 
+    HDFS-7358. Clients may get stuck waiting when using ByteArrayManager.
+    (szetszwo)
+
 Release 2.6.0 - 2014-11-15
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 17942f2..51b1006 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -262,7 +262,9 @@ public class DFSOutputStream extends FSOutputSummer
       maxChunks = chunksPerPkt;
     }
 
-    void writeData(byte[] inarray, int off, int len) {
+    synchronized void writeData(byte[] inarray, int off, int len)
+        throws ClosedChannelException {
+      checkBuffer();
       if (dataPos + len > buf.length) {
         throw new BufferOverflowException();
       }
@@ -270,7 +272,9 @@ public class DFSOutputStream extends FSOutputSummer
       dataPos += len;
     }
 
-    void writeChecksum(byte[] inarray, int off, int len) {
+    synchronized void writeChecksum(byte[] inarray, int off, int len)
+        throws ClosedChannelException {
+      checkBuffer();
       if (len == 0) {
         return;
       }
@@ -284,7 +288,9 @@ public class DFSOutputStream extends FSOutputSummer
     /**
      * Write the full packet, including the header, to the given output stream.
      */
-    void writeTo(DataOutputStream stm) throws IOException {
+    synchronized void writeTo(DataOutputStream stm) throws IOException {
+      checkBuffer();
+
       final int dataLen = dataPos - dataStart;
       final int checksumLen = checksumPos - checksumStart;
       final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
@@ -326,7 +332,13 @@ public class DFSOutputStream extends FSOutputSummer
       }
     }
 
-    private void releaseBuffer(ByteArrayManager bam) {
+    private synchronized void checkBuffer() throws ClosedChannelException {
+      if (buf == null) {
+        throw new ClosedChannelException();
+      }
+    }
+
+    private synchronized void releaseBuffer(ByteArrayManager bam) {
       bam.release(buf);
       buf = null;
     }
@@ -712,7 +724,7 @@ public class DFSOutputStream extends FSOutputSummer
       closeResponder();       // close and join
       closeStream();
       streamerClosed = true;
-      closed = true;
+      setClosed();
       synchronized (dataQueue) {
         dataQueue.notifyAll();
       }
@@ -1616,8 +1628,9 @@ public class DFSOutputStream extends FSOutputSummer
     return sock;
   }
 
+  @Override
   protected void checkClosed() throws IOException {
-    if (closed) {
+    if (isClosed()) {
       IOException e = lastException.get();
       throw e != null ? e : new ClosedChannelException();
     }
@@ -1827,7 +1840,7 @@ public class DFSOutputStream extends FSOutputSummer
     synchronized (dataQueue) {
       try {
       // If queue is full, then wait till we have enough space
-      while (!closed && dataQueue.size() + ackQueue.size()  > dfsClient.getConf().writeMaxPackets) {
+      while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
         try {
           dataQueue.wait();
         } catch (InterruptedException e) {
@@ -2013,8 +2026,9 @@ public class DFSOutputStream extends FSOutputSummer
             // So send an empty sync packet.
             currentPacket = createPacket(packetSize, chunksPerPacket,
                 bytesCurBlock, currentSeqno++);
-          } else {
+          } else if (currentPacket != null) {
             // just discard the current packet since it is already been sent.
+            currentPacket.releaseBuffer(byteArrayManager);
             currentPacket = null;
           }
         }
@@ -2071,7 +2085,7 @@ public class DFSOutputStream extends FSOutputSummer
     } catch (IOException e) {
       DFSClient.LOG.warn("Error while syncing", e);
       synchronized (this) {
-        if (!closed) {
+        if (!isClosed()) {
           lastException.set(new IOException("IOException flush:" + e));
           closeThreads(true);
         }
@@ -2133,7 +2147,7 @@ public class DFSOutputStream extends FSOutputSummer
     long begin = Time.monotonicNow();
     try {
       synchronized (dataQueue) {
-        while (!closed) {
+        while (!isClosed()) {
           checkClosed();
           if (lastAckedSeqno >= seqno) {
             break;
@@ -2166,7 +2180,7 @@ public class DFSOutputStream extends FSOutputSummer
    * resources associated with this stream.
    */
   synchronized void abort() throws IOException {
-    if (closed) {
+    if (isClosed()) {
       return;
     }
     streamer.setLastException(new IOException("Lease timeout of "
@@ -2175,6 +2189,25 @@ public class DFSOutputStream extends FSOutputSummer
     dfsClient.endFileLease(fileId);
   }
 
+  boolean isClosed() {
+    return closed;
+  }
+
+  void setClosed() {
+    closed = true;
+    synchronized (dataQueue) {
+      releaseBuffer(dataQueue, byteArrayManager);
+      releaseBuffer(ackQueue, byteArrayManager);
+    }
+  }
+  
+  private static void releaseBuffer(List<Packet> packets, ByteArrayManager bam) {
+    for(Packet p : packets) {
+      p.releaseBuffer(bam);
+    }
+    packets.clear();
+  }
+
   // shutdown datastreamer and responseprocessor threads.
   // interrupt datastreamer if force is true
   private void closeThreads(boolean force) throws IOException {
@@ -2189,7 +2222,7 @@ public class DFSOutputStream extends FSOutputSummer
     } finally {
       streamer = null;
       s = null;
-      closed = true;
+      setClosed();
     }
   }
   
@@ -2199,7 +2232,7 @@ public class DFSOutputStream extends FSOutputSummer
    */
   @Override
   public synchronized void close() throws IOException {
-    if (closed) {
+    if (isClosed()) {
       IOException e = lastException.getAndSet(null);
       if (e == null)
         return;
@@ -2229,7 +2262,7 @@ public class DFSOutputStream extends FSOutputSummer
       dfsClient.endFileLease(fileId);
     } catch (ClosedChannelException e) {
     } finally {
-      closed = true;
+      setClosed();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
index 4751e72..ea5e39d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java
@@ -200,12 +200,7 @@ public abstract class ByteArrayManager {
         debugMessage.get().append(", ").append(this);
       }
 
-      if (numAllocated == maxAllocated) {
-        if (LOG.isDebugEnabled()) {
-          debugMessage.get().append(", notifyAll");
-        }
-        notifyAll();
-      }
+      notify();
       numAllocated--;
       if (numAllocated < 0) {
         // it is possible to drop below 0 since
@@ -346,12 +341,13 @@ public abstract class ByteArrayManager {
      * the number of allocated arrays drops to below the capacity.
      * 
      * The byte array allocated by this method must be returned for recycling
-     * via the {@link ByteArrayManager#recycle(byte[])} method.
+     * via the {@link Impl#release(byte[])} method.
      *
      * @return a byte array with length larger than or equal to the given length.
      */
     @Override
     public byte[] newByteArray(final int arrayLength) throws InterruptedException {
+      Preconditions.checkArgument(arrayLength >= 0);
       if (LOG.isDebugEnabled()) {
         debugMessage.get().append("allocate(").append(arrayLength).append(")");
       }
@@ -375,6 +371,7 @@ public abstract class ByteArrayManager {
       }
   
       if (LOG.isDebugEnabled()) {
+        debugMessage.get().append(", return byte[").append(array.length).append("]");
         logDebugMessage();
       }
       return array;
@@ -384,7 +381,9 @@ public abstract class ByteArrayManager {
      * Recycle the given byte array.
      * 
      * The byte array may or may not be allocated
-     * by the {@link ByteArrayManager#allocate(int)} method.
+     * by the {@link Impl#newByteArray(int)} method.
+     * 
+     * This is a non-blocking call.
      */
     @Override
     public int release(final byte[] array) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
index 7b4d2bb..9ada95f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
@@ -340,11 +340,11 @@ public class TestHFlush {
         // If we made it past the hflush(), then that means that the ack made it back
         // from the pipeline before we got to the wait() call. In that case we should
         // still have interrupted status.
-        assertTrue(Thread.currentThread().interrupted());
+        assertTrue(Thread.interrupted());
       } catch (InterruptedIOException ie) {
         System.out.println("Got expected exception during flush");
       }
-      assertFalse(Thread.currentThread().interrupted());
+      assertFalse(Thread.interrupted());
 
       // Try again to flush should succeed since we no longer have interrupt status
       stm.hflush();
@@ -362,11 +362,11 @@ public class TestHFlush {
         // If we made it past the close(), then that means that the ack made it back
         // from the pipeline before we got to the wait() call. In that case we should
         // still have interrupted status.
-        assertTrue(Thread.currentThread().interrupted());
+        assertTrue(Thread.interrupted());
       } catch (InterruptedIOException ioe) {
         System.out.println("Got expected exception during close");
         // If we got the exception, we shouldn't have interrupted status anymore.
-        assertFalse(Thread.currentThread().interrupted());
+        assertFalse(Thread.interrupted());
 
         // Now do a successful close.
         stm.close();
@@ -374,7 +374,7 @@ public class TestHFlush {
 
 
       // verify that entire file is good
-      AppendTestUtil.checkFullFile(fs, p, fileLen,
+      AppendTestUtil.checkFullFile(fs, p, 4,
         fileContents, "Failed to deal with thread interruptions");
     } finally {
       cluster.shutdown();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/394ba94c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
index 289617a..77a68c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestByteArrayManager.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -141,7 +143,7 @@ public class TestByteArrayManager {
 
       { // recycle half of the arrays
         for(int i = 0; i < countThreshold/2; i++) {
-          recycler.submit(removeLast(allocator.futures));
+          recycler.submit(removeLast(allocator.futures).get());
         }
 
         for(Future<Integer> f : recycler.furtures) {
@@ -186,8 +188,8 @@ public class TestByteArrayManager {
         }
 
         // recycle an array
-        recycler.submit(removeLast(allocator.futures));
-        Assert.assertEquals(1, removeLast(recycler.furtures).intValue());
+        recycler.submit(removeLast(allocator.futures).get());
+        Assert.assertEquals(1, removeLast(recycler.furtures).get().intValue());
 
         // check if the thread is unblocked
         Thread.sleep(100);
@@ -207,11 +209,11 @@ public class TestByteArrayManager {
     }
   }
 
-  static <T> T removeLast(List<Future<T>> furtures) throws Exception {
+  static <T> Future<T> removeLast(List<Future<T>> furtures) throws Exception {
     return remove(furtures, furtures.size() - 1);
   }
-  static <T> T remove(List<Future<T>> furtures, int i) throws Exception {
-    return furtures.isEmpty()? null: furtures.remove(i).get();
+  static <T> Future<T> remove(List<Future<T>> furtures, int i) throws Exception {
+    return furtures.isEmpty()? null: furtures.remove(i);
   }
   
   static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
@@ -320,12 +322,13 @@ public class TestByteArrayManager {
     final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
     final Thread[] threads = new Thread[runners.length];
 
-    final int num = 1 << 8;
+    final int num = 1 << 10;
     for(int i = 0; i < runners.length; i++) {
       runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
       threads[i] = runners[i].start(num);
     }
     
+    final List<Exception> exceptions = new ArrayList<Exception>();
     final Thread randomRecycler = new Thread() {
       @Override
       public void run() {
@@ -336,10 +339,11 @@ public class TestByteArrayManager {
             runners[j].recycle();
           } catch (Exception e) {
             e.printStackTrace();
-            Assert.fail(this + " has " + e);
+            exceptions.add(new Exception(this + " has an exception", e));
           }
 
           if ((i & 0xFF) == 0) {
+            LOG.info("randomRecycler sleep, i=" + i);
             sleepMs(100);
           }
         }
@@ -361,6 +365,7 @@ public class TestByteArrayManager {
     randomRecycler.start();
     
     randomRecycler.join();
+    Assert.assertTrue(exceptions.isEmpty());
 
     Assert.assertNull(counters.get(0, false));
     for(int i = 1; i < runners.length; i++) {
@@ -392,7 +397,7 @@ public class TestByteArrayManager {
   }
 
   static class Runner implements Runnable {
-    static final int NUM_RUNNERS = 4;
+    static final int NUM_RUNNERS = 5;
 
     static int index2arrayLength(int index) {
       return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
@@ -453,16 +458,22 @@ public class TestByteArrayManager {
       return f;
     }
 
-    byte[] removeFirst() throws Exception {
+    Future<byte[]> removeFirst() throws Exception {
       synchronized (arrays) {
         return remove(arrays, 0);
       }
     }
 
     void recycle() throws Exception {
-      final byte[] a = removeFirst();
-      if (a != null) {
-        recycle(a);
+      final Future<byte[]> f = removeFirst();
+      if (f != null) {
+        printf("randomRecycler: ");
+        try {
+          recycle(f.get(10, TimeUnit.MILLISECONDS));
+        } catch(TimeoutException e) {
+          recycle(new byte[maxArrayLength]);
+          printf("timeout, new byte[%d]\n", maxArrayLength);
+        }
       }
     }
 
@@ -490,9 +501,9 @@ public class TestByteArrayManager {
           submitAllocate();
         } else {
           try {
-            final byte[] a = removeFirst();
-            if (a != null) {
-              submitRecycle(a);
+            final Future<byte[]> f = removeFirst();
+            if (f != null) {
+              submitRecycle(f.get());
             }
           } catch (Exception e) {
             e.printStackTrace();