You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/19 07:00:29 UTC

svn commit: r638716 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/DataNode.java

Author: dhruba
Date: Tue Mar 18 23:00:24 2008
New Revision: 638716

URL: http://svn.apache.org/viewvc?rev=638716&view=rev
Log:
HADOOP-3033. The BlockReceiver thread in the datanode writes data to
the block file, changes file position (if needed) and flushes all by
itself. The PacketResponder thread does not flush block file. (dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=638716&r1=638715&r2=638716&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Mar 18 23:00:24 2008
@@ -287,6 +287,10 @@
     HADOOP-3011. Prohibit distcp from overwriting directories on the
     destination filesystem with files. (cdouglas)
 
+    HADOOP-3033. The BlockReceiver thread in the datanode writes data to 
+    the block file, changes file position (if needed) and flushes all by
+    itself. The PacketResponder thread does not flush block file. (dhruba)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=638716&r1=638715&r2=638716&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar 18 23:00:24 2008
@@ -1937,11 +1937,6 @@
                          " from " + receiver.inAddr);
               }
               lastPacket = true;
-            } else {
-              // flush packet to disk before sending ack
-              if (!receiver.finalized) {
-                receiver.flush();
-              }
             }
 
             replyOut.writeLong(expected);
@@ -1990,6 +1985,15 @@
                 LOG.debug("PacketResponder " + numTargets + " got seqno = " + seqno);
                 Packet pkt = null;
                 synchronized (this) {
+                  while (running && shouldRun && ackQueue.size() == 0) {
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug("PacketResponder " + numTargets + 
+                                " seqno = " + seqno +
+                                " for block " + block +
+                                " waiting for local datanode to finish write.");
+                    }
+                    wait();
+                  }
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
                   notifyAll();
@@ -2026,12 +2030,6 @@
                        " of size " + block.getNumBytes() + 
                        " from " + receiver.inAddr);
             }
-            else if (!lastPacketInBlock) {
-              // flush packet to disk before sending ack
-              if (!receiver.finalized) {
-                receiver.flush();
-              }
-            }
 
             // send my status back to upstream datanode
             replyOut.writeLong(expected); // send seqno upstream
@@ -2149,8 +2147,6 @@
     private FSDataset.BlockWriteStreams streams;
     private boolean isRecovery = false;
     private String clientName;
-    private Object currentWriteLock;
-    volatile private boolean currentWrite;
 
     BlockReceiver(Block block, DataInputStream in, String inAddr,
                   boolean isRecovery, String clientName)
@@ -2162,8 +2158,6 @@
         this.isRecovery = isRecovery;
         this.clientName = clientName;
         this.offsetInBlock = 0;
-        this.currentWriteLock = new Object();
-        this.currentWrite = false;
         this.checksum = DataChecksum.newDataChecksum(in);
         this.bytesPerChecksum = checksum.getBytesPerChecksum();
         this.checksumSize = checksum.getChecksumSize();
@@ -2190,18 +2184,6 @@
     // close files
     public void close() throws IOException {
 
-      synchronized (currentWriteLock) {
-        while (currentWrite) {
-          try {
-            LOG.info("BlockReceiver for block " + block +
-                     " waiting for last write to drain.");
-            currentWriteLock.wait();
-          } catch (InterruptedException e) {
-            throw new IOException("BlockReceiver for block " + block +
-                                  " interrupted drain of last io.");
-          }
-        }
-      }
       IOException ioe = null;
       // close checksum file
       try {
@@ -2262,11 +2244,6 @@
       }
 
       checksum.reset();
-
-      // record the fact that the current write is still in progress
-      synchronized (currentWriteLock) {
-        currentWrite = true;
-      }
       offsetInBlock += len;
 
       // First write to remote node before writing locally.
@@ -2287,10 +2264,6 @@
           // recovery.
           //
           if (clientName.length() > 0) {
-            synchronized (currentWriteLock) {
-              currentWrite = false;
-              currentWriteLock.notifyAll();
-            }
             throw ioe;
           }
         }
@@ -2306,11 +2279,6 @@
       } catch (IOException iex) {
         checkDiskError(iex);
         throw iex;
-      } finally {
-        synchronized (currentWriteLock) {
-          currentWrite = false;
-          currentWriteLock.notifyAll();
-        }
       }
 
       if (throttler != null) { // throttle I/O
@@ -2384,12 +2352,6 @@
             throw e;
           }
         }
-        // first enqueue the ack packet to avoid a race with the response coming
-        // from downstream datanode.
-        if (responder != null) {
-          ((PacketResponder)responder.getRunnable()).enqueue(seqno, 
-                                          lastPacketInBlock); 
-        }
       }
 
       if (len == 0) {
@@ -2443,8 +2405,11 @@
         curPacketSize += 4;
       }
 
+      /// flush entire packet before sending ack
+      flush();
+
       // put in queue for pending acks
-      if (responder != null && mirrorOut == null) {
+      if (responder != null) {
         ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                         lastPacketInBlock); 
       }