You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/08/11 01:41:10 UTC

svn commit: r564804 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs: FileDataServlet.java HftpFileSystem.java ListPathsServlet.java

Author: omalley
Date: Fri Aug 10 16:41:10 2007
New Revision: 564804

URL: http://svn.apache.org/viewvc?view=rev&rev=564804
Log:
HADOOP-1568. Missed files

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java?view=auto&rev=564804
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java Fri Aug 10 16:41:10 2007
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/** Redirect queries about the hosted filesystem to an appropriate datanode.
+ * @see org.apache.hadoop.fs.HftpFileSystem
+ */
+public class FileDataServlet extends HttpServlet {
+
+  static URI getUri(DFSFileInfo i, NameNode nn)
+      throws IOException, URISyntaxException {
+    final DatanodeInfo host = pickSrcDatanode(i, nn);
+    return new URI("http", null, host.getHostName(), host.getInfoPort(),
+          "/streamFile", "filename=" + i.getPath(), null);
+  }
+
+  private final static int BLOCK_SAMPLE = 5;
+
+  /** Select a datanode to service this request.
+   * Currently, this looks at no more than the first five blocks of a file,
+   * selecting a datanode randomly from the most represented.
+   */
+  protected static DatanodeInfo pickSrcDatanode(DFSFileInfo i, NameNode nn)
+      throws IOException {
+    long sample;
+    if (i.getLen() == 0) sample = 1;
+    else sample = i.getLen() / i.getBlockSize() > BLOCK_SAMPLE
+        ? i.getBlockSize() * BLOCK_SAMPLE - 1
+        : i.getLen();
+    final LocatedBlocks blks = nn.getBlockLocations(
+        i.getPath().toUri().getPath(), 0, sample);
+    HashMap<DatanodeInfo, Integer> count = new HashMap<DatanodeInfo, Integer>();
+    for (LocatedBlock b : blks.getLocatedBlocks()) {
+      for (DatanodeInfo d : b.getLocations()) {
+        if (!count.containsKey(d)) {
+          count.put(d, 0);
+        }
+        count.put(d, count.get(d) + 1);
+      }
+    }
+    ArrayList<DatanodeInfo> loc = new ArrayList<DatanodeInfo>();
+    int max = 0;
+    for (Map.Entry<DatanodeInfo, Integer> e : count.entrySet()) {
+      if (e.getValue() > max) {
+        loc.clear();
+        max = e.getValue();
+      }
+      if (e.getValue() == max) {
+        loc.add(e.getKey());
+      }
+    }
+    final Random r = new Random();
+    return loc.get(r.nextInt(loc.size()));
+  }
+
+  /**
+   * Service a GET request as described below.
+   * Request:
+   * {@code
+   * GET http://<nn>:<port>/data[/<path>] HTTP/1.1
+   * }
+   */
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+    throws ServletException, IOException {
+
+    try {
+      final String path = request.getPathInfo() != null
+        ? request.getPathInfo() : "/";
+      final NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
+      DFSFileInfo info = nn.getFileInfo(path);
+      if (!info.isDir()) {
+        response.sendRedirect(getUri(info, nn).toURL().toString());
+      } else {
+        response.sendError(400, "cat: " + path + ": is a directory");
+      }
+    } catch (URISyntaxException e) {
+      response.getWriter().println(e.toString());
+    } catch (IOException e) {
+      response.sendError(400, e.getMessage());
+    }
+  }
+
+}
+

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java?view=auto&rev=564804
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java Fri Aug 10 16:41:10 2007
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TimeZone;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/** An implementation of a protocol for accessing filesystems over HTTP.
+ * The following implementation provides a limited, read-only interface
+ * to a filesystem over HTTP.
+ * @see org.apache.hadoop.dfs.ListPathsServlet
+ * @see org.apache.hadoop.dfs.FileDataServet;
+ */
+public class HftpFileSystem extends FileSystem {
+  static {
+    HttpURLConnection.setFollowRedirects(true);
+  }
+
+  private String fshostname = "";
+  private int fsport = -1;
+  protected static final SimpleDateFormat df = ListPathsServlet.df;
+
+  @Override
+  public void initialize(URI name, Configuration conf) throws IOException {
+    setConf(conf);
+    this.fshostname = name.getHost();
+    this.fsport = name.getPort() != -1
+      ? name.getPort()
+      : conf.getInt("dfs.info.port", -1);
+  }
+
+  @Override
+  public URI getUri() {
+    try {
+      return new URI("hftp", null, fshostname, fsport, null, null, null);
+    } catch (URISyntaxException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int buffersize) throws IOException {
+    HttpURLConnection connection = null;
+    try {
+      final URL url = new URI("http", null, fshostname, fsport,
+          "/data" + f.toUri().getPath(), null, null).toURL();
+      connection = (HttpURLConnection)url.openConnection();
+      connection.setRequestMethod("GET");
+      connection.connect();
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+    final InputStream in = connection.getInputStream();
+    return new FSDataInputStream(new FSInputStream() {
+        public int read() throws IOException {
+          return in.read();
+        }
+        public int read(byte[] b, int off, int len) throws IOException {
+          return in.read(b, off, len);
+        }
+
+        public void close() throws IOException {
+          in.close();
+        }
+
+        public void seek(long pos) throws IOException {
+          throw new IOException("Can't seek!");
+        }
+        public long getPos() throws IOException {
+          throw new IOException("Position unknown!");
+        }
+        public boolean seekToNewSource(long targetPos) throws IOException {
+          return false;
+        }
+      });
+  }
+
+  /** Class to parse and store a listing reply from the server. */
+  class LsParser extends DefaultHandler {
+
+    ArrayList<FileStatus> fslist = new ArrayList<FileStatus>();
+
+    public void startElement(String ns, String localname, String qname,
+                Attributes attrs) throws SAXException {
+      if ("listing".equals(qname)) return;
+      if (!"file".equals(qname) && !"directory".equals(qname)) {
+        throw new SAXException("Unrecognized entry: " + qname);
+      }
+      long modif;
+      try {
+        modif = df.parse(attrs.getValue("modified")).getTime();
+      } catch (ParseException e) { throw new SAXException(e); }
+      FileStatus fs = "file".equals(qname)
+        ? new FileStatus(
+              Long.valueOf(attrs.getValue("size")).longValue(), false,
+              Short.valueOf(attrs.getValue("replication")).shortValue(),
+              Long.valueOf(attrs.getValue("blocksize")).longValue(),
+              modif, new Path("hftp", fshostname + ":" + fsport,
+              attrs.getValue("path")))
+        : new FileStatus(0L, true, 0, 0L,
+              modif, new Path("hftp", fshostname + ":" + fsport,
+              attrs.getValue("path")));
+      fslist.add(fs);
+    }
+
+    private void fetchList(String path, boolean recur) throws IOException {
+      try {
+        XMLReader xr = XMLReaderFactory.createXMLReader();
+        xr.setContentHandler(this);
+        final URL url = new URI("http", null, fshostname, fsport,
+            "/listPaths" + path, recur ? "recursive=yes" : null , null).toURL();
+        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+        connection.setRequestMethod("GET");
+        connection.connect();
+
+        InputStream resp = connection.getInputStream();
+        xr.parse(new InputSource(resp));
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    public FileStatus getFileStatus(Path f) throws IOException {
+      fetchList(f.toUri().getPath(), false);
+      if (fslist.size() == 0) {
+        throw new IOException("File does not exist");
+      }
+      return fslist.get(0);
+    }
+
+    public FileStatus[] listStatus(Path f, boolean recur) throws IOException {
+      fetchList(f.toUri().getPath(), recur);
+      if (fslist.size() > 0 && (fslist.size() != 1 || fslist.get(0).isDir())) {
+        fslist.remove(0);
+      }
+      return fslist.toArray(new FileStatus[0]);
+    }
+
+    public FileStatus[] listStatus(Path f) throws IOException {
+      return listStatus(f, false);
+    }
+  }
+
+  @Override
+  public boolean exists(Path f) throws IOException {
+    LsParser lsparser = new LsParser();
+    return lsparser.getFileStatus(f) != null;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    LsParser lsparser = new LsParser();
+    return lsparser.listStatus(f);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    LsParser lsparser = new LsParser();
+    return lsparser.getFileStatus(f);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return new Path("/").makeQualified(this);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path f) { }
+
+  @Override
+  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
+                                   short replication, long blockSize,
+                                   Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public boolean delete(Path f) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public boolean mkdirs(Path f) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java?view=auto&rev=564804
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java Fri Aug 10 16:41:10 2007
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.hadoop.util.VersionInfo;
+
+import org.znerd.xmlenc.*;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Obtain meta-information about a filesystem.
+ * @see org.apache.hadoop.fs.HftpFileSystem
+ */
+public class ListPathsServlet extends HttpServlet {
+
+  static final SimpleDateFormat df =
+    new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
+  static {
+    df.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
+
+  /**
+   * Write a node to output.
+   * Dir: path, modification
+   * File: path, size, replication, blocksize, and modification
+   */
+  protected void writeItem(DFSFileInfo i, XMLOutputter doc, NameNode nn)
+      throws IOException, URISyntaxException {
+    doc.startTag(i.isDir() ? "directory" : "file");
+    doc.attribute("path", i.getPath().toUri().getPath());
+    doc.attribute("modified", df.format(new Date(i.getModificationTime())));
+    if (!i.isDir()) {
+      doc.attribute("size", String.valueOf(i.getLen()));
+      doc.attribute("replication", String.valueOf(i.getReplication()));
+      doc.attribute("blocksize", String.valueOf(i.getBlockSize()));
+    }
+    doc.endTag();
+  }
+
+  /**
+   * Build a map from the query string, setting values and defaults.
+   */
+  protected Map<String,String> buildRoot(HttpServletRequest request,
+      XMLOutputter doc) {
+    final String path = request.getPathInfo() != null
+      ? request.getPathInfo() : "/";
+    final String exclude = request.getParameter("exclude") != null
+      ? request.getParameter("exclude") : "\\..*\\.crc";
+    final String filter = request.getParameter("filter") != null
+      ? request.getParameter("filter") : ".*";
+    final boolean recur = request.getParameter("recursive") != null
+      && "yes".equals(request.getParameter("recursive"));
+
+    Map<String, String> root = new HashMap<String, String>();
+    root.put("path", path);
+    root.put("recursive", recur ? "yes" : "no");
+    root.put("filter", filter);
+    root.put("exclude", exclude);
+    root.put("time", df.format(new Date()));
+    root.put("version", VersionInfo.getVersion());
+    return root;
+  }
+
+  /**
+   * Service a GET request as described below.
+   * Request:
+   * {@code
+   * GET http://<nn>:<port>/listPaths[/<path>][<?option>[&option]*] HTTP/1.1
+   * }
+   *
+   * Where <i>option</i> (default) in:
+   * recursive (&quot;no&quot;)
+   * filter (&quot;.*&quot;)
+   * exclude (&quot;\..*\.crc&quot;)
+   *
+   * Response: A flat list of files/directories in the following format:
+   * {@code
+   *   <listing path="..." recursive="(yes|no)" filter="..."
+   *            time="yyyy-MM-dd hh:mm:ss UTC" version="...">
+   *     <directory path="..." modified="yyyy-MM-dd hh:mm:ss"/>
+   *     <file path="..." modified="yyyy-MM-dd'T'hh:mm:ssZ" blocksize="..."
+   *           replication="..." size="..."/>
+   *   </listing>
+   * }
+   */
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+    throws ServletException, IOException {
+
+    final PrintWriter out = response.getWriter();
+    final XMLOutputter doc = new XMLOutputter(out, "UTF-8");
+    try {
+      final Map<String, String> root = buildRoot(request, doc);
+      final String path = root.get("path");
+      final boolean recur = "yes".equals(root.get("recursive"));
+      final Pattern filter = Pattern.compile(root.get("filter"));
+      final Pattern exclude = Pattern.compile(root.get("exclude"));
+      final NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
+      doc.declaration();
+      doc.startTag("listing");
+      for (Map.Entry<String,String> m : root.entrySet()) {
+        doc.attribute(m.getKey(), m.getValue());
+      }
+
+      DFSFileInfo base = nn.getFileInfo(path);
+      if (base.isDir()) {
+        writeItem(base, doc, nn);
+      }
+
+      Stack<String> pathstack = new Stack<String>();
+      pathstack.push(path);
+      while (!pathstack.empty()) {
+        for (DFSFileInfo i : nn.getListing(pathstack.pop())) {
+          if (exclude.matcher(i.getName()).matches()
+              || !filter.matcher(i.getName()).matches()) {
+            continue;
+          }
+          if (recur && i.isDir()) {
+            pathstack.push(i.getPath().toUri().getPath());
+          }
+          writeItem(i, doc, nn);
+        }
+      }
+
+    } catch (URISyntaxException e) {
+      out.println(e.toString());
+    } catch (PatternSyntaxException e) {
+      out.println(e.toString());
+    } finally {
+      if (doc != null) {
+        doc.endDocument();
+      }
+
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+}