You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/03/31 00:33:29 UTC

svn commit: r390262 - /lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java

Author: cutting
Date: Thu Mar 30 14:33:27 2006
New Revision: 390262

URL: http://svn.apache.org/viewcvs?rev=390262&view=rev
Log:
Fix for HADOOP-107.  As they were written, dfs blocks were both trickled to a datanode and tee'd to a temp file (in case the connection to the datanode failed).  Now they're only written to the temp file, with no connection to the datanode made until the block is complete.  This reduces the number of long-lived mostly-idle connections to datanodes, which was causing problems.  It also simplifies the DFSClient code significantly.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=390262&r1=390261&r2=390262&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Mar 30 14:33:27 2006
@@ -526,9 +526,8 @@
         private int pos = 0;
 
         private UTF8 src;
-        boolean closingDown = false;
         private boolean overwrite;
-        private boolean blockStreamWorking;
+        private boolean firstTime = true;
         private DataOutputStream blockStream;
         private DataInputStream blockReplyStream;
         private File backupFile;
@@ -543,13 +542,8 @@
         public DFSOutputStream(UTF8 src, boolean overwrite) throws IOException {
             this.src = src;
             this.overwrite = overwrite;
-            this.blockStream = null;
-            this.blockReplyStream = null;
-            this.blockStreamWorking = false;
             this.backupFile = newBackupFile();
-
-            this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
-            nextBlockOutputStream(true);
+            this.backupStream = new FileOutputStream(backupFile);
         }
 
         private File newBackupFile() throws IOException {
@@ -565,13 +559,7 @@
          * This happens when a file is created and each time a new block is allocated.
          * Must get block ID and the IDs of the destinations from the namenode.
          */
-        private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException {
-            if (! firstTime && blockStreamWorking) {
-                blockStream.flush();
-                s.close();
-                blockStreamWorking = false;
-            }
-
+        private synchronized void nextBlockOutputStream() throws IOException {
             boolean retry = false;
             long start = System.currentTimeMillis();
             do {
@@ -644,8 +632,8 @@
                 bytesWrittenToBlock = 0;
                 blockStream = out;
                 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-                blockStreamWorking = true;
             } while (retry);
+            firstTime = false;
         }
 
         /**
@@ -708,7 +696,6 @@
             }
             if (bytesWrittenToBlock == BLOCK_SIZE) {
                 endBlock();
-                nextBlockOutputStream(false);
             }
             flushData(pos);
         }
@@ -720,19 +707,7 @@
         private synchronized void flushData(int maxPos) throws IOException {
             int workingPos = Math.min(pos, maxPos);
             
-            if (workingPos > 0 || 
-                (workingPos == 0 && closingDown)) {
-                //
-                // To the blockStream, write length, then bytes
-                //
-                if (blockStreamWorking) {
-                    try {
-                        blockStream.writeLong(workingPos);
-                        blockStream.write(outBuf, 0, workingPos);
-                    } catch (IOException ie) {
-                        handleSocketException(ie);
-                    }
-                }
+            if (workingPos > 0) {
                 //
                 // To the local block backup, write just the bytes
                 //
@@ -751,43 +726,27 @@
          * We're done writing to the current block.
          */
         private synchronized void endBlock() throws IOException {
-            boolean mustRecover = ! blockStreamWorking;
-
-            //
-            // A zero-length set of data indicates the end of the block
-            //
-            if (blockStreamWorking) {
-                try {
-                    internalClose();
-                } catch (IOException ie) {
-                    handleSocketException(ie);
-                    mustRecover = true;
-                } finally {
-                    blockStreamWorking = false;
-                }
-            }
-
             //
             // Done with local copy
             //
             backupStream.close();
 
             //
-            // If necessary, recover from a failed datanode connection.
+            // Send it to datanode
             //
+            boolean mustRecover = true;
             while (mustRecover) {
-                nextBlockOutputStream(false);
+                nextBlockOutputStream();
                 InputStream in = new FileInputStream(backupFile);
                 try {
                     byte buf[] = new byte[BUFFER_SIZE];
                     int bytesRead = in.read(buf);
-                    while (bytesRead >= 0) {
+                    while (bytesRead > 0) {
                         blockStream.writeLong((long) bytesRead);
                         blockStream.write(buf, 0, bytesRead);
                         bytesRead = in.read(buf);
                     }
                     internalClose();
-                    LOG.info("Recovered from failed datanode connection");
                     mustRecover = false;
                 } catch (IOException ie) {
                     handleSocketException(ie);
@@ -801,12 +760,12 @@
             //
             backupFile.delete();
             backupFile = newBackupFile();
-            backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
+            backupStream = new FileOutputStream(backupFile);
+            bytesWrittenToBlock = 0;
         }
 
         /**
-         * Close down stream to remote datanode.  Called from two places
-         * in endBlock();
+         * Close down stream to remote datanode.
          */
         private synchronized void internalClose() throws IOException {
             blockStream.writeLong(0);
@@ -823,16 +782,19 @@
             namenode.reportWrittenBlock(lb);
 
             s.close();
+            s = null;
         }
 
         private void handleSocketException(IOException ie) throws IOException {
           LOG.log(Level.WARNING, "Error while writing.", ie);
           try {
-            s.close();
+            if (s != null) {
+              s.close();
+              s = null;
+            }
           } catch (IOException ie2) {
             LOG.log(Level.WARNING, "Error closing socket.", ie2);
           }
-          blockStreamWorking = false;
           namenode.abandonBlock(block, src.toString());
         }
 
@@ -845,16 +807,17 @@
                 throw new IOException("Stream closed");
             }
 
-            closingDown = true;
             flush();
-            endBlock();
+            if (filePos == 0 || bytesWrittenToBlock != 0) {
+              endBlock();
+            }
 
             backupStream.close();
             backupFile.delete();
 
-            if (blockStreamWorking) {
+            if (s != null) {
                 s.close();
-                blockStreamWorking = false;
+                s = null;
             }
             super.close();
 
@@ -873,7 +836,6 @@
                 }
             }
             closed = true;
-            closingDown = false;
         }
     }
 }