You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/06/28 23:46:13 UTC

svn commit: r1355174 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/test/java/org/apache/hadoop/hdfs/server/namenode/

Author: todd
Date: Thu Jun 28 21:46:12 2012
New Revision: 1355174

URL: http://svn.apache.org/viewvc?rev=1355174&view=rev
Log:
HDFS-3571. Allow EditLogFileInputStream to read from a remote URL. Contributed by Todd Lipcon.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1355174&r1=1355173&r2=1355174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jun 28 21:46:12 2012
@@ -97,6 +97,8 @@ Trunk (unreleased changes)
     BlockPlacementPolicyDefault extensible for reusing code in subclasses.
     (Junping Du via szetszwo)
 
+    HDFS-3571. Allow EditLogFileInputStream to read from a remote URL (todd)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1355174&r1=1355173&r2=1355174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Thu Jun 28 21:46:12 2012
@@ -18,27 +18,37 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.BufferedInputStream;
-import java.io.EOFException;
-import java.io.DataInputStream;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.SecurityUtil;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 
 /**
  * An implementation of the abstract class {@link EditLogInputStream}, which
- * reads edits from a local file.
+ * reads edits from a file. That file may be either on the local disk or
+ * accessible via a URL.
  */
+@InterfaceAudience.Private
 public class EditLogFileInputStream extends EditLogInputStream {
-  private final File file;
+  private final LogSource log;
   private final long firstTxId;
   private final long lastTxId;
   private final boolean isInProgress;
@@ -48,7 +58,7 @@ public class EditLogFileInputStream exte
     CLOSED
   }
   private State state = State.UNINIT;
-  private FileInputStream fStream = null;
+  private InputStream fStream = null;
   private int logVersion = 0;
   private FSEditLogOp.Reader reader = null;
   private FSEditLogLoader.PositionTrackingInputStream tracker = null;
@@ -81,7 +91,29 @@ public class EditLogFileInputStream exte
    */
   public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
       boolean isInProgress) {
-    this.file = name;
+    this(new FileLog(name), firstTxId, lastTxId, isInProgress);
+  }
+  
+  /**
+   * Open an EditLogInputStream for the given URL.
+   *
+   * @param url the url hosting the log
+   * @param startTxId the expected starting txid
+   * @param endTxId the expected ending txid
+   * @param inProgress whether the log is in-progress
+   * @return a stream from which edits may be read
+   */
+  public static EditLogInputStream fromUrl(URL url, long startTxId,
+      long endTxId, boolean inProgress) {
+    return new EditLogFileInputStream(new URLLog(url),
+        startTxId, endTxId, inProgress);
+  }
+  
+  private EditLogFileInputStream(LogSource log,
+      long firstTxId, long lastTxId,
+      boolean isInProgress) {
+      
+    this.log = log;
     this.firstTxId = firstTxId;
     this.lastTxId = lastTxId;
     this.isInProgress = isInProgress;
@@ -91,7 +123,7 @@ public class EditLogFileInputStream exte
     Preconditions.checkState(state == State.UNINIT);
     BufferedInputStream bin = null;
     try {
-      fStream = new FileInputStream(file);
+      fStream = log.getInputStream();
       bin = new BufferedInputStream(fStream);
       tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
       dataIn = new DataInputStream(tracker);
@@ -122,7 +154,7 @@ public class EditLogFileInputStream exte
 
   @Override
   public String getName() {
-    return file.getPath();
+    return log.getName();
   }
 
   private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
@@ -160,7 +192,7 @@ public class EditLogFileInputStream exte
           // we were supposed to read out of the stream.
           // So we force an EOF on all subsequent reads.
           //
-          long skipAmt = file.length() - tracker.getPos();
+          long skipAmt = log.length() - tracker.getPos();
           if (skipAmt > 0) {
             LOG.warn("skipping " + skipAmt + " bytes at the end " +
               "of edit log  '" + getName() + "': reached txid " + txId +
@@ -220,7 +252,7 @@ public class EditLogFileInputStream exte
   @Override
   public long length() throws IOException {
     // file size + size of both buffers
-    return file.length();
+    return log.length();
   }
   
   @Override
@@ -291,4 +323,85 @@ public class EditLogFileInputStream exte
       super(msg);
     }
   }
+  
+  private interface LogSource {
+    public InputStream getInputStream() throws IOException;
+    public long length();
+    public String getName();
+  }
+  
+  private static class FileLog implements LogSource {
+    private final File file;
+    
+    public FileLog(File file) {
+      this.file = file;
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      return new FileInputStream(file);
+    }
+
+    @Override
+    public long length() {
+      return file.length();
+    }
+
+    @Override
+    public String getName() {
+      return file.getPath();
+    }
+  }
+
+  private static class URLLog implements LogSource {
+    private final URL url;
+    private long advertisedSize = -1;
+
+    private final static String CONTENT_LENGTH = "Content-Length";
+
+    public URLLog(URL url) {
+      this.url = url;
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      HttpURLConnection connection = (HttpURLConnection)
+          SecurityUtil.openSecureHttpConnection(url);
+      
+      if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+        throw new HttpGetFailedException(
+            "Fetch of " + url +
+            " failed with status code " + connection.getResponseCode() +
+            "\nResponse message:\n" + connection.getResponseMessage(),
+            connection);
+      }
+
+      String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+      if (contentLength != null) {
+        advertisedSize = Long.parseLong(contentLength);
+        if (advertisedSize <= 0) {
+          throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
+              contentLength);
+        }
+      } else {
+        throw new IOException(CONTENT_LENGTH + " header is not provided " +
+                              "by the server when trying to fetch " + url);
+      }
+
+      return connection.getInputStream();
+    }
+
+    @Override
+    public long length() {
+      Preconditions.checkState(advertisedSize != -1,
+          "must get input stream before length is available");
+      return advertisedSize;
+    }
+
+    @Override
+    public String getName() {
+      return url.toString();
+    }
+  }
+  
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java?rev=1355174&r1=1355173&r2=1355174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java Thu Jun 28 21:46:12 2012
@@ -229,28 +229,34 @@ public abstract class FSImageTestUtil {
    */
   public static EnumMap<FSEditLogOpCodes,Holder<Integer>> countEditLogOpTypes(
       File editLog) throws Exception {
-    EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
-      new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
-
     EditLogInputStream elis = new EditLogFileInputStream(editLog);
     try {
-      FSEditLogOp op;
-      while ((op = elis.readOp()) != null) {
-        Holder<Integer> i = opCounts.get(op.opCode);
-        if (i == null) {
-          i = new Holder<Integer>(0);
-          opCounts.put(op.opCode, i);
-        }
-        i.held++;
-      }
+      return countEditLogOpTypes(elis);
     } finally {
       IOUtils.closeStream(elis);
     }
+  }
+
+  /**
+   * @see #countEditLogOpTypes(File)
+   */
+  public static EnumMap<FSEditLogOpCodes, Holder<Integer>> countEditLogOpTypes(
+      EditLogInputStream elis) throws IOException {
+    EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
+        new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
     
+    FSEditLogOp op;
+    while ((op = elis.readOp()) != null) {
+      Holder<Integer> i = opCounts.get(op.opCode);
+      if (i == null) {
+        i = new Holder<Integer>(0);
+        opCounts.put(op.opCode, i);
+      }
+      i.held++;
+    }
     return opCounts;
   }
 
-  
   /**
    * Assert that all of the given directories have the same newest filename
    * for fsimage that they hold the same data.

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java?rev=1355174&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java Thu Jun 28 21:46:12 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.EnumMap;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.http.HttpServer;
+import org.junit.Test;
+
+public class TestEditLogFileInputStream {
+  private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
+
+  @Test
+  public void testReadURL() throws Exception {
+    // Start a simple web server which hosts the log data.
+    HttpServer server = new HttpServer("test", "0.0.0.0", 0, true);
+    server.start();
+    try {
+      server.addServlet("fakeLog", "/fakeLog", FakeLogServlet.class);
+      URL url = new URL("http://localhost:" + server.getPort() + "/fakeLog");
+      EditLogInputStream elis = EditLogFileInputStream.fromUrl(
+          url, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
+          false);
+      // Read the edit log and verify that we got all of the data.
+      EnumMap<FSEditLogOpCodes, Holder<Integer>> counts =
+          FSImageTestUtil.countEditLogOpTypes(elis);
+      assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_ADD).held);
+      assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP).held);
+      assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
+     
+      // Check that length header was picked up.
+      assertEquals(FAKE_LOG_DATA.length, elis.length());
+      elis.close();
+    } finally {
+      server.stop();
+    }
+  }
+
+  @SuppressWarnings("serial")
+  public static class FakeLogServlet extends HttpServlet {
+    @Override
+    public void doGet(HttpServletRequest request, 
+                      HttpServletResponse response
+                      ) throws ServletException, IOException {
+      response.setHeader("Content-Length",
+          String.valueOf(FAKE_LOG_DATA.length));
+      OutputStream out = response.getOutputStream();
+      out.write(FAKE_LOG_DATA);
+      out.close();
+    }
+  }
+
+}