You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/03/09 01:17:35 UTC

svn commit: r384385 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs: DFSClient.java DataNode.java FSNamesystem.java NameNode.java

Author: cutting
Date: Wed Mar  8 16:17:27 2006
New Revision: 384385

URL: http://svn.apache.org/viewcvs?rev=384385&view=rev
Log:
Fix for HADOOP-66.  DFS blocks are no longer written to local temp files.  If a connection to a datanode fails then an exception is now thrown, rather than trying to re-connect to another datanode.  Timeouts were also removed from datanode connections, since these caused a lot of failed connections.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=384385&r1=384384&r2=384385&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Mar  8 16:17:27 2006
@@ -377,7 +377,7 @@
                 }
                 try {
                     s = new Socket(targetAddr.getAddress(), targetAddr.getPort());
-                    s.setSoTimeout(READ_TIMEOUT);
+                    //s.setSoTimeout(READ_TIMEOUT);
 
                     //
                     // Xmit header info to datanode
@@ -528,11 +528,8 @@
         private UTF8 src;
         boolean closingDown = false;
         private boolean overwrite;
-        private boolean blockStreamWorking;
         private DataOutputStream blockStream;
         private DataInputStream blockReplyStream;
-        private File backupFile;
-        private OutputStream backupStream;
         private Block block;
         private DatanodeInfo targets[]; 
         private long filePos = 0;
@@ -546,9 +543,7 @@
             this.overwrite = overwrite;
             this.blockStream = null;
             this.blockReplyStream = null;
-            this.blockStreamWorking = false;
-            this.backupFile = File.createTempFile("dfsout", "bak");
-            this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
+
             nextBlockOutputStream(true);
         }
 
@@ -558,12 +553,6 @@
          * Must get block ID and the IDs of the destinations from the namenode.
          */
         private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException {
-            if (! firstTime && blockStreamWorking) {
-                blockStream.close();
-                blockReplyStream.close();
-                blockStreamWorking = false;
-            }
-
             boolean retry = false;
             long start = System.currentTimeMillis();
             do {
@@ -602,7 +591,7 @@
                 Socket s = null;
                 try {
                     s = new Socket(target.getAddress(), target.getPort());
-                    s.setSoTimeout(READ_TIMEOUT);
+                    //s.setSoTimeout(READ_TIMEOUT);
                 } catch (IOException ie) {
                     // Connection failed.  Let's wait a little bit and retry
                     try {
@@ -636,7 +625,6 @@
                 bytesWrittenToBlock = 0;
                 blockStream = out;
                 blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));
-                blockStreamWorking = true;
             } while (retry);
         }
 
@@ -717,27 +705,21 @@
                 //
                 // To the blockStream, write length, then bytes
                 //
-                if (blockStreamWorking) {
-                    try {
-                        blockStream.writeLong(workingPos);
-                        blockStream.write(outBuf, 0, workingPos);
-                    } catch (IOException ie) {
-                        try {
-                            blockStream.close();
-                        } catch (IOException ie2) {
-                        }
-                        try {
-                            blockReplyStream.close();
-                        } catch (IOException ie2) {
-                        }
-                        namenode.abandonBlock(block, src.toString());
-                        blockStreamWorking = false;
-                    }
+                try {
+                  blockStream.writeLong(workingPos);
+                  blockStream.write(outBuf, 0, workingPos);
+                } catch (IOException ie) {
+                  try {
+                    blockStream.close();
+                  } catch (IOException ie2) {
+                  }
+                  try {
+                    blockReplyStream.close();
+                  } catch (IOException ie2) {
+                  }
+                  namenode.abandonBlock(block, src.toString());
+                  throw ie;
                 }
-                //
-                // To the local block backup, write just the bytes
-                //
-                backupStream.write(outBuf, 0, workingPos);
 
                 //
                 // Track position
@@ -752,79 +734,20 @@
          * We're done writing to the current block.
          */
         private synchronized void endBlock() throws IOException {
-            boolean mustRecover = ! blockStreamWorking;
-
-            //
-            // A zero-length set of data indicates the end of the block
-            //
-            if (blockStreamWorking) {
-                try {
-                    internalClose();
-                } catch (IOException ie) {
-                    try {
-                        blockStream.close();
-                    } catch (IOException ie2) {
-                    }
-                    try {
-                        blockReplyStream.close();
-                    } catch (IOException ie2) {
-                    }
-                    namenode.abandonBlock(block, src.toString());
-                    mustRecover = true;
-                } finally {
-                    blockStreamWorking = false;
-                }
-            }
-
-            //
-            // Done with local copy
-            //
-            backupStream.close();
-
-            //
-            // If necessary, recover from a failed datanode connection.
-            //
-            while (mustRecover) {
-                nextBlockOutputStream(false);
-                InputStream in = new FileInputStream(backupFile);
-                try {
-                    byte buf[] = new byte[BUFFER_SIZE];
-                    int bytesRead = in.read(buf);
-                    while (bytesRead >= 0) {
-                        blockStream.writeLong((long) bytesRead);
-                        blockStream.write(buf, 0, bytesRead);
-                        bytesRead = in.read(buf);
-                    }
-                    internalClose();
-                    LOG.info("Recovered from failed datanode connection");
-                    mustRecover = false;
-                } catch (IOException ie) {
-                    try {
-                        blockStream.close();
-                    } catch (IOException ie2) {
-                    }
-                    try {
-                        blockReplyStream.close();
-                    } catch (IOException ie2) {
-                    }
-                    namenode.abandonBlock(block, src.toString());
-                    blockStreamWorking = false;
-                }
+            try {
+              internalClose();
+            } catch (IOException ie) {
+              namenode.abandonBlock(block, src.toString());
+              throw ie;
             }
-
-            //
-            // Delete local backup, start new one
-            //
-            backupFile.delete();
-            backupFile = File.createTempFile("dfsout", "bak");
-            backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
         }
 
         /**
-         * Close down stream to remote datanode.  Called from two places
-         * in endBlock();
+         * Close down stream to remote datanode.
          */
         private synchronized void internalClose() throws IOException {
+          try {
+            // A zero-length set of data indicates the end of the block
             blockStream.writeLong(0);
             blockStream.flush();
 
@@ -838,8 +761,16 @@
             lb.readFields(blockReplyStream);
             namenode.reportWrittenBlock(lb);
 
-            blockStream.close();
-            blockReplyStream.close();
+          } finally {
+            try {
+              blockStream.close();
+            } catch (IOException ie2) {
+            }
+            try {
+              blockReplyStream.close();
+            } catch (IOException ie2) {
+            }
+          }
         }
 
         /**
@@ -855,14 +786,9 @@
             flush();
             endBlock();
 
-            backupStream.close();
-            backupFile.delete();
+            blockStream.close();                
+            blockReplyStream.close();
 
-            if (blockStreamWorking) {
-                blockStream.close();                
-                blockReplyStream.close();
-                blockStreamWorking = false;
-            }
             super.close();
 
             long localstart = System.currentTimeMillis();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=384385&r1=384384&r2=384385&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Mar  8 16:17:27 2006
@@ -286,7 +286,7 @@
             try {
                 while (shouldListen) {
                     Socket s = ss.accept();
-                    s.setSoTimeout(READ_TIMEOUT);
+                    //s.setSoTimeout(READ_TIMEOUT);
                     new Daemon(new DataXceiver(s)).start();
                 }
                 ss.close();
@@ -368,10 +368,10 @@
                                     // Connect to backup machine
                                     mirrorTarget = createSocketAddr(targets[1].getName().toString());
                                     try {
-                                        Socket s = new Socket(mirrorTarget.getAddress(), mirrorTarget.getPort());
-                                        s.setSoTimeout(READ_TIMEOUT);
-                                        out2 = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
-                                        in2 = new DataInputStream(new BufferedInputStream(s.getInputStream()));
+                                        Socket s2 = new Socket(mirrorTarget.getAddress(), mirrorTarget.getPort());
+                                        //s2.setSoTimeout(READ_TIMEOUT);
+                                        out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream()));
+                                        in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream()));
 
                                         // Write connection header
                                         out2.write(OP_WRITE_BLOCK);
@@ -507,6 +507,7 @@
                             mirrors.add(curTarget);
                             LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()]));
                             newLB.write(reply);
+                            reply.flush();
                         } finally {
                             reply.close();
                         }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=384385&r1=384384&r2=384385&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Mar  8 16:17:27 2006
@@ -1277,15 +1277,14 @@
         //
         // Build list of machines we can actually choose from
         //
-        long latestRemaining = 0;
         Vector targetList = new Vector();
         for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
             DatanodeInfo node = (DatanodeInfo) it.next();
             if (! forbiddenMachines.contains(node.getHost())) {
                 targetList.add(node);
-                latestRemaining += node.getRemaining();
             }
         }
+        Collections.shuffle(targetList);
 
         //
         // Now pick one
@@ -1309,12 +1308,9 @@
             //
             // Otherwise, choose node according to target capacity
             //
-            double target = Math.abs(r.nextDouble()) * latestRemaining;
             for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                 DatanodeInfo node = (DatanodeInfo) it.next();
-                target -= node.getRemaining();
-                if ((node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) &&
-                    (target <= 0)) {
+                if ((node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE)) {
                     return node;
                 }
             }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=384385&r1=384384&r2=384385&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Mar  8 16:17:27 2006
@@ -184,9 +184,7 @@
      * The client needs to give up on the block.
      */
     public void abandonBlock(Block b, String src) throws IOException {
-        if (! namesystem.abandonBlock(b, new UTF8(src))) {
-            throw new IOException("Cannot abandon block during write to " + src);
-        }
+        namesystem.abandonBlock(b, new UTF8(src));
     }
     /**
      */