You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2010/06/19 08:27:07 UTC

svn commit: r956192 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java

Author: dhruba
Date: Sat Jun 19 06:27:07 2010
New Revision: 956192

URL: http://svn.apache.org/viewvc?rev=956192&view=rev
Log:
HDFS-947. An Hftp read request is redirected to a datanode that has 
the most replicas of the blocks in the file. (Dmytro Molkov via dhruba)


Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=956192&r1=956191&r2=956192&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Sat Jun 19 06:27:07 2010
@@ -56,6 +56,9 @@ Trunk (unreleased changes)
 
     HDFS-752. Add interfaces classification to to HDFS source code. (suresh)
 
+    HDFS-947. An Hftp read request is redirected to a datanode that has 
+    the most replicas of the blocks in the file. (Dmytro Molkov via dhruba)
+
   BUG FIXES
 
     HDFS-1039. Adding test for  JspHelper.getUGI(jnp via boryas)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=956192&r1=956191&r2=956192&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Sat Jun 19 06:27:07 2010
@@ -24,9 +24,11 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URL;
 import java.net.URLEncoder;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Random;
 import java.util.TreeSet;
 
@@ -41,6 +43,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
@@ -68,23 +71,89 @@ public class JspHelper {
 
   /** Private constructor for preventing creating JspHelper object. */
   private JspHelper() {} 
+  
+  // data structure to count number of blocks on datanodes.
+  private static class NodeRecord extends DatanodeInfo {
+    int frequency;
+
+    public NodeRecord() {
+      frequency = -1;
+    }
+    public NodeRecord(DatanodeInfo info, int count) {
+      super(info);
+      this.frequency = count;
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+      // Sufficient to use super equality as datanodes are uniquely identified
+      // by DatanodeID
+      return (this == obj) || super.equals(obj);
+    }
+    @Override
+    public int hashCode() {
+      // Super implementation is sufficient
+      return super.hashCode();
+    }
+  }
+ 
+  // compare two records based on their frequency
+  private static class NodeRecordComparator implements Comparator<NodeRecord> {
+
+    public int compare(NodeRecord o1, NodeRecord o2) {
+      if (o1.frequency < o2.frequency) {
+        return -1;
+      } else if (o1.frequency > o2.frequency) {
+        return 1;
+      } 
+      return 0;
+    }
+  }
+  public static DatanodeInfo bestNode(LocatedBlocks blks) throws IOException {
+    HashMap<DatanodeInfo, NodeRecord> map =
+      new HashMap<DatanodeInfo, NodeRecord>();
+    for (LocatedBlock block : blks.getLocatedBlocks()) {
+      DatanodeInfo[] nodes = block.getLocations();
+      for (DatanodeInfo node : nodes) {
+        NodeRecord record = map.get(node);
+        if (record == null) {
+          map.put(node, new NodeRecord(node, 1));
+        } else {
+          record.frequency++;
+        }
+      }
+    }
+    NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
+    Arrays.sort(nodes, new NodeRecordComparator());
+    return bestNode(nodes, false);
+  }
 
   public static DatanodeInfo bestNode(LocatedBlock blk) throws IOException {
+    DatanodeInfo[] nodes = blk.getLocations();
+    return bestNode(nodes, true);
+  }
+
+  public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom)
+    throws IOException {
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
     DatanodeInfo chosenNode = null;
     int failures = 0;
     Socket s = null;
-    DatanodeInfo [] nodes = blk.getLocations();
+    int index = -1;
     if (nodes == null || nodes.length == 0) {
       throw new IOException("No nodes contain this block");
     }
     while (s == null) {
       if (chosenNode == null) {
         do {
-          chosenNode = nodes[rand.nextInt(nodes.length)];
+          if (doRandom) {
+            index = rand.nextInt(nodes.length);
+          } else {
+            index++;
+          }
+          chosenNode = nodes[index];
         } while (deadNodes.contains(chosenNode));
       }
-      int index = rand.nextInt(nodes.length);
       chosenNode = nodes[index];
 
       //just ping to check whether the node is alive

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=956192&r1=956191&r2=956192&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Sat Jun 19 06:27:07 2010
@@ -77,7 +77,7 @@ public class FileDataServlet extends Dfs
       NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
       return nn.getNamesystem().getRandomDatanode();
     }
-    return JspHelper.bestNode(blks.get(0));
+    return JspHelper.bestNode(blks);
   }
 
   /**

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java?rev=956192&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java Sat Jun 19 06:27:07 2010
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.HttpURLConnection;
+import java.util.Arrays;
+import java.util.Random;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.log4j.Level;
+
+/**
+ * Unittest for HftpFileSystem.
+ *
+ */
+public class TestHftpFileSystem extends TestCase {
+  private static final Random RAN = new Random();
+  private static final Path TEST_FILE = new Path("/testfile+1");
+  
+  private static Configuration config = null;
+  private static MiniDFSCluster cluster = null;
+  private static FileSystem hdfs = null;
+  private static HftpFileSystem hftpFs = null;
+  
+  /**
+   * Setup hadoop mini-cluster for test.
+   */
+  private static void oneTimeSetUp() throws IOException {
+    ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
+
+    final long seed = RAN.nextLong();
+    System.out.println("seed=" + seed);
+    RAN.setSeed(seed);
+
+    config = new Configuration();
+    config.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
+
+    cluster = new MiniDFSCluster(config, 2, true, null);
+    hdfs = cluster.getFileSystem();
+    final String hftpuri = 
+      "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+    hftpFs = (HftpFileSystem) new Path(hftpuri).getFileSystem(config);
+  }
+  
+  /**
+   * Shutdown the hadoop mini-cluster.
+   */
+  private static void oneTimeTearDown() throws IOException {
+    hdfs.close();
+    hftpFs.close();
+    cluster.shutdown();
+  }
+  
+  public TestHftpFileSystem(String name) {
+    super(name);
+  }
+
+  /**
+   * For one time setup / teardown.
+   */
+  public static Test suite() {
+    TestSuite suite = new TestSuite();
+    
+    suite.addTestSuite(TestHftpFileSystem.class);
+    
+    return new TestSetup(suite) {
+      @Override
+      protected void setUp() throws IOException {
+        oneTimeSetUp();
+      }
+      
+      @Override
+      protected void tearDown() throws IOException {
+        oneTimeTearDown();
+      }
+    };
+  }
+  
+  public void testDataNodeRedirect() throws Exception {
+    if (hdfs.exists(TEST_FILE)) {
+      hdfs.delete(TEST_FILE, true);
+    }
+    FSDataOutputStream out = hdfs.create(TEST_FILE, (short) 1);
+    out.writeBytes("0123456789");
+    out.close();
+    
+    BlockLocation[] locations = 
+        hdfs.getFileBlockLocations(TEST_FILE, 0, 10);
+    String locationName = locations[0].getNames()[0];
+    URL u = hftpFs.getNamenodeFileURL(TEST_FILE);
+    HttpURLConnection conn = (HttpURLConnection)u.openConnection();
+    conn.setFollowRedirects(true);
+    conn.connect();
+    conn.getInputStream();
+    boolean checked = false;
+    // Find the datanode that has the block according to locations
+    // and check that the URL was redirected to this DN's info port
+    for (DataNode node : cluster.getDataNodes()) {
+      if (node.getDatanodeRegistration().getName().equals(locationName)) {
+        checked = true;
+        assertEquals(node.getDatanodeRegistration().getInfoPort(),
+                    conn.getURL().getPort());
+      }
+    }
+    assertTrue("The test never checked that location of " + 
+              "the block and hftp desitnation are the same", checked);
+  }
+  /**
+   * Tests getPos() functionality.
+   */
+  public void testGetPos() throws Exception {
+    // Write a test file.
+    FSDataOutputStream out = hdfs.create(TEST_FILE, true);
+    out.writeBytes("0123456789");
+    out.close();
+    
+    FSDataInputStream in = hftpFs.open(TEST_FILE);
+    
+    // Test read().
+    for (int i = 0; i < 5; ++i) {
+      assertEquals(i, in.getPos());
+      in.read();
+    }
+    
+    // Test read(b, off, len).
+    assertEquals(5, in.getPos());
+    byte[] buffer = new byte[10];
+    assertEquals(2, in.read(buffer, 0, 2));
+    assertEquals(7, in.getPos());
+    
+    // Test read(b).
+    int bytesRead = in.read(buffer);
+    assertEquals(7 + bytesRead, in.getPos());
+    
+    // Test EOF.
+    for (int i = 0; i < 100; ++i) {
+      in.read();
+    }
+    assertEquals(10, in.getPos());
+    in.close();
+  }
+  
+  /**
+   * Tests seek().
+   */
+  public void testSeek() throws Exception {
+    // Write a test file.
+    FSDataOutputStream out = hdfs.create(TEST_FILE, true);
+    out.writeBytes("0123456789");
+    out.close();
+    
+    FSDataInputStream in = hftpFs.open(TEST_FILE);
+    in.seek(7);
+    assertEquals('7', in.read());
+  }
+}