You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/05 07:51:34 UTC

svn commit: r633780 - in /hadoop/core/branches/branch-0.16: CHANGES.txt src/java/org/apache/hadoop/dfs/DataNode.java src/java/org/apache/hadoop/dfs/FSDataset.java

Author: dhruba
Date: Tue Mar  4 22:51:33 2008
New Revision: 633780

URL: http://svn.apache.org/viewvc?rev=633780&view=rev
Log:
HADOOP-2883. Write failures and data corruptions on HDFS files.
The write timeout is back to what it was on 0.15 release. Also, the
datnodes flushes the block file buffered output stream before
sending a positive ack for the packet back to the client. (dhruba)


Modified:
    hadoop/core/branches/branch-0.16/CHANGES.txt
    hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/FSDataset.java

Modified: hadoop/core/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/CHANGES.txt?rev=633780&r1=633779&r2=633780&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.16/CHANGES.txt Tue Mar  4 22:51:33 2008
@@ -105,6 +105,11 @@
     HADOOP-2931. IOException thrown by DFSOutputStream had wrong stack
     trace in some cases. (Michael Bieniosek via rangadi)
 
+    HADOOP-2883. Write failures and data corruptions on HDFS files.
+    The write timeout is back to what it was on 0.15 release. Also, the
+    datnodes flushes the block file buffered output stream before
+    sending a positive ack for the packet back to the client. (dhruba)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java?rev=633780&r1=633779&r2=633780&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar  4 22:51:33 2008
@@ -1088,7 +1088,7 @@
           mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
           mirrorSock = new Socket();
           try {
-            int timeoutValue = 3000 * numTargets + socketTimeout;
+            int timeoutValue = numTargets * socketTimeout;
             mirrorSock.connect(mirrorTarget, timeoutValue);
             mirrorSock.setSoTimeout(timeoutValue);
             mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
@@ -1777,6 +1777,11 @@
                          " from " + receiver.inAddr);
               }
               lastPacket = true;
+            } else {
+              // flush packet to disk before sending ack
+              if (!receiver.finalized) {
+                receiver.flush();
+              }
             }
 
             replyOut.writeLong(expected);
@@ -1861,6 +1866,12 @@
                        " of size " + block.getNumBytes() + 
                        " from " + receiver.inAddr);
             }
+            else if (!lastPacketInBlock) {
+              // flush packet to disk before sending ack
+              if (!receiver.finalized) {
+                receiver.flush();
+              }
+            }
 
             // send my status back to upstream datanode
             replyOut.writeLong(expected); // send seqno upstream
@@ -2036,6 +2047,16 @@
       }
     }
 
+    // flush block data and metadata files to disk.
+    void flush() throws IOException {
+      if (checksumOut != null) {
+        checksumOut.flush();
+      }
+      if (out != null) {
+        out.flush();
+      }
+    }
+
     /* receive a chunk: write it to disk & mirror it to another stream */
     private void receiveChunk( int len ) throws IOException {
       if (len <= 0 || len > bytesPerChecksum) {
@@ -2326,7 +2347,7 @@
       if (checksumOut != null) {
         checksumOut.flush();
       }
-      LOG.info("Changing block file offset from " + 
+      LOG.info("Changing block file offset of block " + block + " from " + 
                data.getChannelPosition(block, streams) +
                " to " + offsetInBlock +
                " meta file offset to " + offsetInChecksum);

Modified: hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=633780&r1=633779&r2=633780&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ hadoop/core/branches/branch-0.16/src/java/org/apache/hadoop/dfs/FSDataset.java Tue Mar  4 22:51:33 2008
@@ -343,6 +343,14 @@
       }
       return f;
     }
+
+    /**
+     * Returns the name of the temporary file for this block.
+     */
+    File getTmpFile(Block b) throws IOException {
+      File f = new File(tmpDir, b.getBlockName());
+      return f;
+    }
       
     File addBlock(Block b, File f) throws IOException {
       File blockFile = dataDir.addBlock(b, f);
@@ -698,6 +706,18 @@
   public void setChannelPosition(Block b, BlockWriteStreams streams, 
                                  long dataOffset, long ckOffset) 
                                  throws IOException {
+    long size = 0;
+    synchronized (this) {
+      FSVolume vol = volumeMap.get(b);
+      size = vol.getTmpFile(b).length();
+    }
+    if (size < dataOffset) {
+      String msg = "Trying to change block file offset of block " + b +
+                     " to " + dataOffset +
+                     " but actual size of file is " +
+                     size;
+      throw new IOException(msg);
+    }
     FileOutputStream file = (FileOutputStream) streams.dataOut;
     file.getChannel().position(dataOffset);
     file = (FileOutputStream) streams.checksumOut;
@@ -717,7 +737,7 @@
       return vol.createTmpFile(blk);
     }
   }
-  
+
   //
   // REMIND - mjc - eventually we should have a timeout system
   // in place to clean up block files left by abandoned clients.
@@ -848,7 +868,7 @@
   }
 
   /**
-   * check if a data diretory is healthy
+   * check if a data directory is healthy
    * @throws DiskErrorException
    */
   public void checkDataDir() throws DiskErrorException {