You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/06/28 23:46:13 UTC
svn commit: r1355174 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/namenode/
src/test/java/org/apache/hadoop/hdfs/server/namenode/
Author: todd
Date: Thu Jun 28 21:46:12 2012
New Revision: 1355174
URL: http://svn.apache.org/viewvc?rev=1355174&view=rev
Log:
HDFS-3571. Allow EditLogFileInputStream to read from a remote URL. Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1355174&r1=1355173&r2=1355174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jun 28 21:46:12 2012
@@ -97,6 +97,8 @@ Trunk (unreleased changes)
BlockPlacementPolicyDefault extensible for reusing code in subclasses.
(Junping Du via szetszwo)
+ HDFS-3571. Allow EditLogFileInputStream to read from a remote URL (todd)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1355174&r1=1355173&r2=1355174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Thu Jun 28 21:46:12 2012
@@ -18,27 +18,37 @@
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.BufferedInputStream;
-import java.io.EOFException;
-import java.io.DataInputStream;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.SecurityUtil;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
/**
* An implementation of the abstract class {@link EditLogInputStream}, which
- * reads edits from a local file.
+ * reads edits from a file. That file may be either on the local disk or
+ * accessible via a URL.
*/
+@InterfaceAudience.Private
public class EditLogFileInputStream extends EditLogInputStream {
- private final File file;
+ private final LogSource log;
private final long firstTxId;
private final long lastTxId;
private final boolean isInProgress;
@@ -48,7 +58,7 @@ public class EditLogFileInputStream exte
CLOSED
}
private State state = State.UNINIT;
- private FileInputStream fStream = null;
+ private InputStream fStream = null;
private int logVersion = 0;
private FSEditLogOp.Reader reader = null;
private FSEditLogLoader.PositionTrackingInputStream tracker = null;
@@ -81,7 +91,29 @@ public class EditLogFileInputStream exte
*/
public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
boolean isInProgress) {
- this.file = name;
+ this(new FileLog(name), firstTxId, lastTxId, isInProgress);
+ }
+
+ /**
+ * Open an EditLogInputStream for the given URL.
+ *
+ * @param url the url hosting the log
+ * @param startTxId the expected starting txid
+ * @param endTxId the expected ending txid
+ * @param inProgress whether the log is in-progress
+ * @return a stream from which edits may be read
+ */
+ public static EditLogInputStream fromUrl(URL url, long startTxId,
+ long endTxId, boolean inProgress) {
+ return new EditLogFileInputStream(new URLLog(url),
+ startTxId, endTxId, inProgress);
+ }
+
+ private EditLogFileInputStream(LogSource log,
+ long firstTxId, long lastTxId,
+ boolean isInProgress) {
+
+ this.log = log;
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.isInProgress = isInProgress;
@@ -91,7 +123,7 @@ public class EditLogFileInputStream exte
Preconditions.checkState(state == State.UNINIT);
BufferedInputStream bin = null;
try {
- fStream = new FileInputStream(file);
+ fStream = log.getInputStream();
bin = new BufferedInputStream(fStream);
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
dataIn = new DataInputStream(tracker);
@@ -122,7 +154,7 @@ public class EditLogFileInputStream exte
@Override
public String getName() {
- return file.getPath();
+ return log.getName();
}
private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
@@ -160,7 +192,7 @@ public class EditLogFileInputStream exte
// we were supposed to read out of the stream.
// So we force an EOF on all subsequent reads.
//
- long skipAmt = file.length() - tracker.getPos();
+ long skipAmt = log.length() - tracker.getPos();
if (skipAmt > 0) {
LOG.warn("skipping " + skipAmt + " bytes at the end " +
"of edit log '" + getName() + "': reached txid " + txId +
@@ -220,7 +252,7 @@ public class EditLogFileInputStream exte
@Override
public long length() throws IOException {
// file size + size of both buffers
- return file.length();
+ return log.length();
}
@Override
@@ -291,4 +323,85 @@ public class EditLogFileInputStream exte
super(msg);
}
}
+
+ private interface LogSource {
+ public InputStream getInputStream() throws IOException;
+ public long length();
+ public String getName();
+ }
+
+ private static class FileLog implements LogSource {
+ private final File file;
+
+ public FileLog(File file) {
+ this.file = file;
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return new FileInputStream(file);
+ }
+
+ @Override
+ public long length() {
+ return file.length();
+ }
+
+ @Override
+ public String getName() {
+ return file.getPath();
+ }
+ }
+
+ private static class URLLog implements LogSource {
+ private final URL url;
+ private long advertisedSize = -1;
+
+ private final static String CONTENT_LENGTH = "Content-Length";
+
+ public URLLog(URL url) {
+ this.url = url;
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ HttpURLConnection connection = (HttpURLConnection)
+ SecurityUtil.openSecureHttpConnection(url);
+
+ if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ throw new HttpGetFailedException(
+ "Fetch of " + url +
+ " failed with status code " + connection.getResponseCode() +
+ "\nResponse message:\n" + connection.getResponseMessage(),
+ connection);
+ }
+
+ String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+ if (contentLength != null) {
+ advertisedSize = Long.parseLong(contentLength);
+ if (advertisedSize <= 0) {
+ throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
+ contentLength);
+ }
+ } else {
+ throw new IOException(CONTENT_LENGTH + " header is not provided " +
+ "by the server when trying to fetch " + url);
+ }
+
+ return connection.getInputStream();
+ }
+
+ @Override
+ public long length() {
+ Preconditions.checkState(advertisedSize != -1,
+ "must get input stream before length is available");
+ return advertisedSize;
+ }
+
+ @Override
+ public String getName() {
+ return url.toString();
+ }
+ }
+
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java?rev=1355174&r1=1355173&r2=1355174&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java Thu Jun 28 21:46:12 2012
@@ -229,28 +229,34 @@ public abstract class FSImageTestUtil {
*/
public static EnumMap<FSEditLogOpCodes,Holder<Integer>> countEditLogOpTypes(
File editLog) throws Exception {
- EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
- new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
-
EditLogInputStream elis = new EditLogFileInputStream(editLog);
try {
- FSEditLogOp op;
- while ((op = elis.readOp()) != null) {
- Holder<Integer> i = opCounts.get(op.opCode);
- if (i == null) {
- i = new Holder<Integer>(0);
- opCounts.put(op.opCode, i);
- }
- i.held++;
- }
+ return countEditLogOpTypes(elis);
} finally {
IOUtils.closeStream(elis);
}
+ }
+
+ /**
+ * @see #countEditLogOpTypes(File)
+ */
+ public static EnumMap<FSEditLogOpCodes, Holder<Integer>> countEditLogOpTypes(
+ EditLogInputStream elis) throws IOException {
+ EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
+ new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
+ FSEditLogOp op;
+ while ((op = elis.readOp()) != null) {
+ Holder<Integer> i = opCounts.get(op.opCode);
+ if (i == null) {
+ i = new Holder<Integer>(0);
+ opCounts.put(op.opCode, i);
+ }
+ i.held++;
+ }
return opCounts;
}
-
/**
* Assert that all of the given directories have the same newest filename
* for fsimage that they hold the same data.
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java?rev=1355174&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java Thu Jun 28 21:46:12 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.EnumMap;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.http.HttpServer;
+import org.junit.Test;
+
+public class TestEditLogFileInputStream {
+ private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
+
+ @Test
+ public void testReadURL() throws Exception {
+ // Start a simple web server which hosts the log data.
+ HttpServer server = new HttpServer("test", "0.0.0.0", 0, true);
+ server.start();
+ try {
+ server.addServlet("fakeLog", "/fakeLog", FakeLogServlet.class);
+ URL url = new URL("http://localhost:" + server.getPort() + "/fakeLog");
+ EditLogInputStream elis = EditLogFileInputStream.fromUrl(
+ url, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
+ false);
+ // Read the edit log and verify that we got all of the data.
+ EnumMap<FSEditLogOpCodes, Holder<Integer>> counts =
+ FSImageTestUtil.countEditLogOpTypes(elis);
+ assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP).held);
+ assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
+
+ // Check that length header was picked up.
+ assertEquals(FAKE_LOG_DATA.length, elis.length());
+ elis.close();
+ } finally {
+ server.stop();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class FakeLogServlet extends HttpServlet {
+ @Override
+ public void doGet(HttpServletRequest request,
+ HttpServletResponse response
+ ) throws ServletException, IOException {
+ response.setHeader("Content-Length",
+ String.valueOf(FAKE_LOG_DATA.length));
+ OutputStream out = response.getOutputStream();
+ out.write(FAKE_LOG_DATA);
+ out.close();
+ }
+ }
+
+}