You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2010/06/19 08:27:07 UTC
svn commit: r956192 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java
Author: dhruba
Date: Sat Jun 19 06:27:07 2010
New Revision: 956192
URL: http://svn.apache.org/viewvc?rev=956192&view=rev
Log:
HDFS-947. An Hftp read request is redirected to a datanode that has
the most replicas of the blocks in the file. (Dmytro Molkov via dhruba)
Added:
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=956192&r1=956191&r2=956192&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Sat Jun 19 06:27:07 2010
@@ -56,6 +56,9 @@ Trunk (unreleased changes)
HDFS-752. Add interfaces classification to to HDFS source code. (suresh)
+ HDFS-947. An Hftp read request is redirected to a datanode that has
+ the most replicas of the blocks in the file. (Dmytro Molkov via dhruba)
+
BUG FIXES
HDFS-1039. Adding test for JspHelper.getUGI(jnp via boryas)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=956192&r1=956191&r2=956192&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Sat Jun 19 06:27:07 2010
@@ -24,9 +24,11 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLEncoder;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Random;
import java.util.TreeSet;
@@ -41,6 +43,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
@@ -68,23 +71,89 @@ public class JspHelper {
/** Private constructor for preventing creating JspHelper object. */
private JspHelper() {}
+
+ // data structure to count number of blocks on datanodes.
+ private static class NodeRecord extends DatanodeInfo {
+ int frequency;
+
+ public NodeRecord() {
+ frequency = -1;
+ }
+ public NodeRecord(DatanodeInfo info, int count) {
+ super(info);
+ this.frequency = count;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ // Sufficient to use super equality as datanodes are uniquely identified
+ // by DatanodeID
+ return (this == obj) || super.equals(obj);
+ }
+ @Override
+ public int hashCode() {
+ // Super implementation is sufficient
+ return super.hashCode();
+ }
+ }
+
+ // compare two records based on their frequency
+ private static class NodeRecordComparator implements Comparator<NodeRecord> {
+
+ public int compare(NodeRecord o1, NodeRecord o2) {
+ if (o1.frequency < o2.frequency) {
+ return -1;
+ } else if (o1.frequency > o2.frequency) {
+ return 1;
+ }
+ return 0;
+ }
+ }
+ public static DatanodeInfo bestNode(LocatedBlocks blks) throws IOException {
+ HashMap<DatanodeInfo, NodeRecord> map =
+ new HashMap<DatanodeInfo, NodeRecord>();
+ for (LocatedBlock block : blks.getLocatedBlocks()) {
+ DatanodeInfo[] nodes = block.getLocations();
+ for (DatanodeInfo node : nodes) {
+ NodeRecord record = map.get(node);
+ if (record == null) {
+ map.put(node, new NodeRecord(node, 1));
+ } else {
+ record.frequency++;
+ }
+ }
+ }
+ NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
+ Arrays.sort(nodes, new NodeRecordComparator());
+ return bestNode(nodes, false);
+ }
public static DatanodeInfo bestNode(LocatedBlock blk) throws IOException {
+ DatanodeInfo[] nodes = blk.getLocations();
+ return bestNode(nodes, true);
+ }
+
+ public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom)
+ throws IOException {
TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
DatanodeInfo chosenNode = null;
int failures = 0;
Socket s = null;
- DatanodeInfo [] nodes = blk.getLocations();
+ int index = -1;
if (nodes == null || nodes.length == 0) {
throw new IOException("No nodes contain this block");
}
while (s == null) {
if (chosenNode == null) {
do {
- chosenNode = nodes[rand.nextInt(nodes.length)];
+ if (doRandom) {
+ index = rand.nextInt(nodes.length);
+ } else {
+ index++;
+ }
+ chosenNode = nodes[index];
} while (deadNodes.contains(chosenNode));
}
- int index = rand.nextInt(nodes.length);
chosenNode = nodes[index];
//just ping to check whether the node is alive
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=956192&r1=956191&r2=956192&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Sat Jun 19 06:27:07 2010
@@ -77,7 +77,7 @@ public class FileDataServlet extends Dfs
NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
return nn.getNamesystem().getRandomDatanode();
}
- return JspHelper.bestNode(blks.get(0));
+ return JspHelper.bestNode(blks);
}
/**
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java?rev=956192&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHftpFileSystem.java Sat Jun 19 06:27:07 2010
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.HttpURLConnection;
+import java.util.Arrays;
+import java.util.Random;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.log4j.Level;
+
+/**
+ * Unittest for HftpFileSystem.
+ *
+ */
+public class TestHftpFileSystem extends TestCase {
+ private static final Random RAN = new Random();
+ private static final Path TEST_FILE = new Path("/testfile+1");
+
+ private static Configuration config = null;
+ private static MiniDFSCluster cluster = null;
+ private static FileSystem hdfs = null;
+ private static HftpFileSystem hftpFs = null;
+
+ /**
+ * Setup hadoop mini-cluster for test.
+ */
+ private static void oneTimeSetUp() throws IOException {
+ ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
+
+ final long seed = RAN.nextLong();
+ System.out.println("seed=" + seed);
+ RAN.setSeed(seed);
+
+ config = new Configuration();
+ config.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
+
+ cluster = new MiniDFSCluster(config, 2, true, null);
+ hdfs = cluster.getFileSystem();
+ final String hftpuri =
+ "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ hftpFs = (HftpFileSystem) new Path(hftpuri).getFileSystem(config);
+ }
+
+ /**
+ * Shutdown the hadoop mini-cluster.
+ */
+ private static void oneTimeTearDown() throws IOException {
+ hdfs.close();
+ hftpFs.close();
+ cluster.shutdown();
+ }
+
+ public TestHftpFileSystem(String name) {
+ super(name);
+ }
+
+ /**
+ * For one time setup / teardown.
+ */
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+
+ suite.addTestSuite(TestHftpFileSystem.class);
+
+ return new TestSetup(suite) {
+ @Override
+ protected void setUp() throws IOException {
+ oneTimeSetUp();
+ }
+
+ @Override
+ protected void tearDown() throws IOException {
+ oneTimeTearDown();
+ }
+ };
+ }
+
+ public void testDataNodeRedirect() throws Exception {
+ if (hdfs.exists(TEST_FILE)) {
+ hdfs.delete(TEST_FILE, true);
+ }
+ FSDataOutputStream out = hdfs.create(TEST_FILE, (short) 1);
+ out.writeBytes("0123456789");
+ out.close();
+
+ BlockLocation[] locations =
+ hdfs.getFileBlockLocations(TEST_FILE, 0, 10);
+ String locationName = locations[0].getNames()[0];
+ URL u = hftpFs.getNamenodeFileURL(TEST_FILE);
+ HttpURLConnection conn = (HttpURLConnection)u.openConnection();
+ conn.setFollowRedirects(true);
+ conn.connect();
+ conn.getInputStream();
+ boolean checked = false;
+ // Find the datanode that has the block according to locations
+ // and check that the URL was redirected to this DN's info port
+ for (DataNode node : cluster.getDataNodes()) {
+ if (node.getDatanodeRegistration().getName().equals(locationName)) {
+ checked = true;
+ assertEquals(node.getDatanodeRegistration().getInfoPort(),
+ conn.getURL().getPort());
+ }
+ }
+ assertTrue("The test never checked that location of " +
+ "the block and hftp desitnation are the same", checked);
+ }
+ /**
+ * Tests getPos() functionality.
+ */
+ public void testGetPos() throws Exception {
+ // Write a test file.
+ FSDataOutputStream out = hdfs.create(TEST_FILE, true);
+ out.writeBytes("0123456789");
+ out.close();
+
+ FSDataInputStream in = hftpFs.open(TEST_FILE);
+
+ // Test read().
+ for (int i = 0; i < 5; ++i) {
+ assertEquals(i, in.getPos());
+ in.read();
+ }
+
+ // Test read(b, off, len).
+ assertEquals(5, in.getPos());
+ byte[] buffer = new byte[10];
+ assertEquals(2, in.read(buffer, 0, 2));
+ assertEquals(7, in.getPos());
+
+ // Test read(b).
+ int bytesRead = in.read(buffer);
+ assertEquals(7 + bytesRead, in.getPos());
+
+ // Test EOF.
+ for (int i = 0; i < 100; ++i) {
+ in.read();
+ }
+ assertEquals(10, in.getPos());
+ in.close();
+ }
+
+ /**
+ * Tests seek().
+ */
+ public void testSeek() throws Exception {
+ // Write a test file.
+ FSDataOutputStream out = hdfs.create(TEST_FILE, true);
+ out.writeBytes("0123456789");
+ out.close();
+
+ FSDataInputStream in = hftpFs.open(TEST_FILE);
+ in.seek(7);
+ assertEquals('7', in.read());
+ }
+}