You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/08 21:55:48 UTC
[hadoop] branch branch-2 updated: HDFS-14696. Backport HDFS-11273
to branch-2 (Move TransferFsImage#doGetUrl function to a Util class)
(#1251) Contributed by Siyao Meng.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 15062b6 HDFS-14696. Backport HDFS-11273 to branch-2 (Move TransferFsImage#doGetUrl function to a Util class) (#1251) Contributed by Siyao Meng.
15062b6 is described below
commit 15062b6d2882d162dabaf884933d9625fff5ae5f
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Thu Aug 8 14:55:41 2019 -0700
HDFS-14696. Backport HDFS-11273 to branch-2 (Move TransferFsImage#doGetUrl function to a Util class) (#1251) Contributed by Siyao Meng.
---
.../hdfs/server/common/HttpGetFailedException.java | 43 ++++
.../hdfs/server/common/HttpPutFailedException.java | 43 ++++
.../org/apache/hadoop/hdfs/server/common/Util.java | 261 ++++++++++++++++++-
.../server/namenode/EditLogFileInputStream.java | 3 +-
.../hadoop/hdfs/server/namenode/ImageServlet.java | 15 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 4 +-
.../hdfs/server/namenode/TransferFsImage.java | 286 ++-------------------
7 files changed, 374 insertions(+), 281 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java
new file mode 100644
index 0000000..592bee2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+
+/**
+ * The exception is thrown when HTTP GET operation has failed.
+ *
+ */
+@InterfaceAudience.Private
+public class HttpGetFailedException extends IOException {
+ private static final long serialVersionUID = 1L;
+ private final int responseCode;
+
+ public HttpGetFailedException(String msg, HttpURLConnection connection)
+ throws IOException {
+ super(msg);
+ this.responseCode = connection.getResponseCode();
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java
new file mode 100644
index 0000000..77a0ee1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+
+/**
+ * The exception is thrown when HTTP PUT operation has failed.
+ *
+ */
+@InterfaceAudience.Private
+public class HttpPutFailedException extends IOException {
+ private static final long serialVersionUID = 1L;
+ private final int responseCode;
+
+ public HttpPutFailedException(String msg, int responseCode)
+ throws IOException {
+ super(msg);
+ this.responseCode = responseCode;
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
index ad5d52c..8f7588f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
@@ -23,26 +23,64 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URL;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@InterfaceAudience.Private
public final class Util {
private final static Log LOG = LogFactory.getLog(Util.class.getName());
+ /* Required headers for FSImage transfer. */
+ public final static String FILE_LENGTH = "File-Length";
+ public final static String CONTENT_LENGTH = "Content-Length";
+ public final static String MD5_HEADER = "X-MD5-Digest";
+ public final static String CONTENT_TYPE = "Content-Type";
+ public final static String CONTENT_TRANSFER_ENCODING =
+ "Content-Transfer-Encoding";
+
+ public final static int IO_FILE_BUFFER_SIZE;
+ private static final boolean isSpnegoEnabled;
+ public static final URLConnectionFactory connectionFactory;
+
+ static {
+ Configuration conf = new Configuration();
+ connectionFactory = URLConnectionFactory
+ .newDefaultURLConnectionFactory(conf);
+ isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
+ IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
+ }
+
/**
* Interprets the passed string as a URI. In case of error it
* assumes the specified string is a file.
*
* @param s the string to interpret
- * @return the resulting URI
- * @throws IOException
+ * @return the resulting URI
*/
- public static URI stringAsURI(String s) throws IOException {
+ static URI stringAsURI(String s) throws IOException {
URI u = null;
// try to make a URI
try {
@@ -67,7 +105,6 @@ public final class Util {
*
* @param f the file to convert
* @return the resulting URI
- * @throws IOException
*/
public static URI fileAsURI(File f) throws IOException {
URI u = f.getCanonicalFile().toURI();
@@ -92,7 +129,7 @@ public final class Util {
*/
public static List<URI> stringCollectionAsURIs(
Collection<String> names) {
- List<URI> uris = new ArrayList<URI>(names.size());
+ List<URI> uris = new ArrayList<>(names.size());
for(String name : names) {
try {
uris.add(stringAsURI(name));
@@ -103,7 +140,6 @@ public final class Util {
return uris;
}
-
public static boolean isDiskStatsEnabled(int fileIOSamplingPercentage) {
final boolean isEnabled;
if (fileIOSamplingPercentage <= 0) {
@@ -120,4 +156,217 @@ public final class Util {
return isEnabled;
}
+
+ /**
+ * Downloads the files at the specified url location into destination
+ * storage.
+ */
+ public static MD5Hash doGetUrl(URL url, List<File> localPaths,
+ Storage dstStorage, boolean getChecksum, int timeout) throws IOException {
+ HttpURLConnection connection;
+ try {
+ connection = (HttpURLConnection)
+ connectionFactory.openConnection(url, isSpnegoEnabled);
+ } catch (AuthenticationException e) {
+ throw new IOException(e);
+ }
+
+ setTimeout(connection, timeout);
+
+ if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ throw new HttpGetFailedException("Image transfer servlet at " + url +
+ " failed with status code " + connection.getResponseCode() +
+ "\nResponse message:\n" + connection.getResponseMessage(),
+ connection);
+ }
+
+ long advertisedSize;
+ String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+ if (contentLength != null) {
+ advertisedSize = Long.parseLong(contentLength);
+ } else {
+ throw new IOException(CONTENT_LENGTH + " header is not provided " +
+ "by the namenode when trying to fetch " + url);
+ }
+ MD5Hash advertisedDigest = parseMD5Header(connection);
+ String fsImageName = connection
+ .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
+ InputStream stream = connection.getInputStream();
+
+ return receiveFile(url.toExternalForm(), localPaths, dstStorage,
+ getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
+ null);
+ }
+
+ /**
+ * Receives file at the url location from the input stream and puts them in
+ * the specified destination storage location.
+ */
+ public static MD5Hash receiveFile(String url, List<File> localPaths,
+ Storage dstStorage, boolean getChecksum, long advertisedSize,
+ MD5Hash advertisedDigest, String fsImageName, InputStream stream,
+ DataTransferThrottler throttler) throws
+ IOException {
+ long startTime = Time.monotonicNow();
+ Map<FileOutputStream, File> streamPathMap = new HashMap<>();
+ StringBuilder xferStats = new StringBuilder();
+ double xferCombined = 0;
+ if (localPaths != null) {
+ // If the local paths refer to directories, use the server-provided header
+ // as the filename within that directory
+ List<File> newLocalPaths = new ArrayList<>();
+ for (File localPath : localPaths) {
+ if (localPath.isDirectory()) {
+ if (fsImageName == null) {
+ throw new IOException("No filename header provided by server");
+ }
+ newLocalPaths.add(new File(localPath, fsImageName));
+ } else {
+ newLocalPaths.add(localPath);
+ }
+ }
+ localPaths = newLocalPaths;
+ }
+
+
+ long received = 0;
+ MessageDigest digester = null;
+ if (getChecksum) {
+ digester = MD5Hash.getDigester();
+ stream = new DigestInputStream(stream, digester);
+ }
+ boolean finishedReceiving = false;
+
+ List<FileOutputStream> outputStreams = Lists.newArrayList();
+
+ try {
+ if (localPaths != null) {
+ for (File f : localPaths) {
+ try {
+ if (f.exists()) {
+ LOG.warn("Overwriting existing file " + f
+ + " with file downloaded from " + url);
+ }
+ FileOutputStream fos = new FileOutputStream(f);
+ outputStreams.add(fos);
+ streamPathMap.put(fos, f);
+ } catch (IOException ioe) {
+ LOG.warn("Unable to download file " + f, ioe);
+ // This will be null if we're downloading the fsimage to a file
+ // outside of an NNStorage directory.
+ if (dstStorage != null &&
+ (dstStorage instanceof StorageErrorReporter)) {
+ ((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
+ }
+ }
+ }
+
+ if (outputStreams.isEmpty()) {
+ throw new IOException(
+ "Unable to download to any storage directory");
+ }
+ }
+
+ int num = 1;
+ byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
+ while (num > 0) {
+ num = stream.read(buf);
+ if (num > 0) {
+ received += num;
+ for (FileOutputStream fos : outputStreams) {
+ fos.write(buf, 0, num);
+ }
+ if (throttler != null) {
+ throttler.throttle(num);
+ }
+ }
+ }
+ finishedReceiving = true;
+ double xferSec = Math.max(
+ ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
+ long xferKb = received / 1024;
+ xferCombined += xferSec;
+ xferStats.append(
+ String.format(" The fsimage download took %.2fs at %.2f KB/s.",
+ xferSec, xferKb / xferSec));
+ } finally {
+ stream.close();
+ for (FileOutputStream fos : outputStreams) {
+ long flushStartTime = Time.monotonicNow();
+ fos.getChannel().force(true);
+ fos.close();
+ double writeSec = Math.max(((float)
+ (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
+ xferCombined += writeSec;
+ xferStats.append(String
+ .format(" Synchronous (fsync) write to disk of " +
+ streamPathMap.get(fos).getAbsolutePath() +
+ " took %.2fs.", writeSec));
+ }
+
+ // Something went wrong and did not finish reading.
+ // Remove the temporary files.
+ if (!finishedReceiving) {
+ deleteTmpFiles(localPaths);
+ }
+
+ if (finishedReceiving && received != advertisedSize) {
+ // only throw this exception if we think we read all of it on our end
+ // -- otherwise a client-side IOException would be masked by this
+ // exception that makes it look like a server-side problem!
+ deleteTmpFiles(localPaths);
+ throw new IOException("File " + url + " received length " + received +
+ " is not of the advertised size " +
+ advertisedSize);
+ }
+ }
+ xferStats.insert(0, String.format("Combined time for fsimage download and" +
+ " fsync to all disks took %.2fs.", xferCombined));
+ LOG.info(xferStats.toString());
+
+ if (digester != null) {
+ MD5Hash computedDigest = new MD5Hash(digester.digest());
+
+ if (advertisedDigest != null &&
+ !computedDigest.equals(advertisedDigest)) {
+ deleteTmpFiles(localPaths);
+ throw new IOException("File " + url + " computed digest " +
+ computedDigest + " does not match advertised digest " +
+ advertisedDigest);
+ }
+ return computedDigest;
+ } else {
+ return null;
+ }
+ }
+
+ private static void deleteTmpFiles(List<File> files) {
+ if (files == null) {
+ return;
+ }
+
+ LOG.info("Deleting temporary files: " + files);
+ for (File file : files) {
+ if (!file.delete()) {
+ LOG.warn("Deleting " + file + " has failed");
+ }
+ }
+ }
+
+ /**
+ * Sets a timeout value in millisecods for the Http connection.
+ * @param connection the Http connection for which timeout needs to be set
+ * @param timeout value to be set as timeout in milliseconds
+ */
+ public static void setTimeout(HttpURLConnection connection, int timeout) {
+ if (timeout > 0) {
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ }
+ }
+
+ private static MD5Hash parseMD5Header(HttpURLConnection connection) {
+ String header = connection.getHeaderField(MD5_HEADER);
+ return (header != null) ? new MD5Hash(header) : null;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
index 33a3131..af72027f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
@@ -36,9 +36,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.LayoutFlags;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
-import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
index 9223abd..fdd8d70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import org.apache.hadoop.hdfs.server.common.Util;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.net.HttpURLConnection;
@@ -304,11 +305,12 @@ public class ImageServlet extends HttpServlet {
*/
public static void setVerificationHeadersForGet(HttpServletResponse response,
File file) throws IOException {
- response.setHeader(TransferFsImage.CONTENT_LENGTH,
+ response.setHeader(
+ Util.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
- response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
+ response.setHeader(Util.MD5_HEADER, hash.toString());
}
}
@@ -437,12 +439,13 @@ public class ImageServlet extends HttpServlet {
*/
static void setVerificationHeadersForPut(HttpURLConnection connection,
File file) throws IOException {
- connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
+ connection.setRequestProperty(
+ Util.CONTENT_LENGTH,
String.valueOf(file.length()));
MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
if (hash != null) {
connection
- .setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
+ .setRequestProperty(Util.MD5_HEADER, hash.toString());
}
}
@@ -462,7 +465,7 @@ public class ImageServlet extends HttpServlet {
params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
// setting the length of the file to be uploaded in separate property as
// Content-Length only supports up to 2GB
- params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
+ params.put(Util.FILE_LENGTH, Long.toString(imageFileSize));
params.put(IMAGE_FILE_TYPE, nnf.name());
return params;
}
@@ -586,7 +589,7 @@ public class ImageServlet extends HttpServlet {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
fileSize = ServletUtil.parseLongParam(request,
- TransferFsImage.FILE_LENGTH);
+ Util.FILE_LENGTH);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 830320a..9f9ad26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -28,6 +28,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
+
import static org.apache.hadoop.util.Time.now;
import java.io.FileNotFoundException;
@@ -137,6 +138,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@@ -2117,7 +2119,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
} catch (FileNotFoundException e) {
LOG.debug("Tried to read from deleted or moved edit log segment", e);
return null;
- } catch (TransferFsImage.HttpGetFailedException e) {
+ } catch (HttpGetFailedException e) {
LOG.debug("Tried to read from deleted edit log segment", e);
return null;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 65200fd..1ceb704 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -19,18 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
-import java.security.DigestInputStream;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -44,17 +38,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.common.HttpPutFailedException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
@@ -66,6 +59,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.mortbay.jetty.EofException;
+import static org.apache.hadoop.hdfs.server.common.Util.IO_FILE_BUFFER_SIZE;
+import static org.apache.hadoop.hdfs.server.common.Util.connectionFactory;
+
/**
* This class provides fetching a specified file from the NameNode.
*/
@@ -98,27 +94,8 @@ public class TransferFsImage {
}
}
- public final static String CONTENT_LENGTH = "Content-Length";
- public final static String FILE_LENGTH = "File-Length";
- public final static String MD5_HEADER = "X-MD5-Digest";
-
- private final static String CONTENT_TYPE = "Content-Type";
- private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
- private final static int IO_FILE_BUFFER_SIZE;
-
@VisibleForTesting
static int timeout = 0;
- private static final URLConnectionFactory connectionFactory;
- private static final boolean isSpnegoEnabled;
-
- static {
- Configuration conf = new Configuration();
- connectionFactory = URLConnectionFactory
- .newDefaultURLConnectionFactory(conf);
- isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
- IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
- }
-
private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
public static void downloadMostRecentImageToDirectory(URL infoServer,
@@ -159,7 +136,7 @@ public class TransferFsImage {
}
MD5Hash advertisedDigest = parseMD5Header(request);
- MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
+ MD5Hash hash = Util.receiveFile(fileName, dstFiles, dstStorage, true,
advertisedSize, advertisedDigest, fileName, stream, throttler);
LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+ dstFiles.get(0).length() + " bytes.");
@@ -323,9 +300,7 @@ public class TransferFsImage {
responseCode, urlWithParams, connection.getResponseMessage()),
responseCode);
}
- } catch (AuthenticationException e) {
- throw new IOException(e);
- } catch (URISyntaxException e) {
+ } catch (AuthenticationException | URISyntaxException e) {
throw new IOException(e);
} finally {
if (connection != null) {
@@ -336,9 +311,10 @@ public class TransferFsImage {
private static void writeFileToPutRequest(Configuration conf,
HttpURLConnection connection, File imageFile, Canceler canceler)
- throws FileNotFoundException, IOException {
- connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
- connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
+ throws IOException {
+ connection.setRequestProperty(Util.CONTENT_TYPE,
+ "application/octet-stream");
+ connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary");
OutputStream output = connection.getOutputStream();
FileInputStream input = new FileInputStream(imageFile);
try {
@@ -431,7 +407,7 @@ public class TransferFsImage {
* Copies the response from the URL to a list of local files.
* @param dstStorage if an error occurs writing to one of the files,
* this storage object will be notified.
- * @Return a digest of the received file if getChecksum is true
+ * @return a digest of the received file if getChecksum is true
*/
static MD5Hash getFileClient(URL infoServer,
String queryString, List<File> localPaths,
@@ -443,40 +419,12 @@ public class TransferFsImage {
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
- HttpURLConnection connection;
- try {
- connection = (HttpURLConnection)
- connectionFactory.openConnection(url, isSpnegoEnabled);
- } catch (AuthenticationException e) {
- throw new IOException(e);
- }
-
- setTimeout(connection);
+ return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout);
+ }
- if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
- throw new HttpGetFailedException(
- "Image transfer servlet at " + url +
- " failed with status code " + connection.getResponseCode() +
- "\nResponse message:\n" + connection.getResponseMessage(),
- connection);
- }
-
- long advertisedSize;
- String contentLength = connection.getHeaderField(CONTENT_LENGTH);
- if (contentLength != null) {
- advertisedSize = Long.parseLong(contentLength);
- } else {
- throw new IOException(CONTENT_LENGTH + " header is not provided " +
- "by the namenode when trying to fetch " + url);
- }
- MD5Hash advertisedDigest = parseMD5Header(connection);
- String fsImageName = connection
- .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
- InputStream stream = connection.getInputStream();
-
- return receiveFile(url.toExternalForm(), localPaths, dstStorage,
- getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
- null);
+ private static MD5Hash parseMD5Header(HttpServletRequest request) {
+ String header = request.getHeader(Util.MD5_HEADER);
+ return (header != null) ? new MD5Hash(header) : null;
}
private static void setTimeout(HttpURLConnection connection) {
@@ -484,204 +432,10 @@ public class TransferFsImage {
Configuration conf = new HdfsConfiguration();
timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
- LOG.info("Image Transfer timeout configured to " + timeout
- + " milliseconds");
+ LOG.info("Image Transfer timeout configured to " + timeout +
+ " milliseconds");
}
- if (timeout > 0) {
- connection.setConnectTimeout(timeout);
- connection.setReadTimeout(timeout);
- }
+ Util.setTimeout(connection, timeout);
}
-
- private static MD5Hash receiveFile(String url, List<File> localPaths,
- Storage dstStorage, boolean getChecksum, long advertisedSize,
- MD5Hash advertisedDigest, String fsImageName, InputStream stream,
- DataTransferThrottler throttler) throws IOException {
- long startTime = Time.monotonicNow();
- Map<FileOutputStream, File> streamPathMap = new HashMap<>();
- StringBuilder xferStats = new StringBuilder();
- double xferCombined = 0;
- if (localPaths != null) {
- // If the local paths refer to directories, use the server-provided header
- // as the filename within that directory
- List<File> newLocalPaths = new ArrayList<File>();
- for (File localPath : localPaths) {
- if (localPath.isDirectory()) {
- if (fsImageName == null) {
- throw new IOException("No filename header provided by server");
- }
- newLocalPaths.add(new File(localPath, fsImageName));
- } else {
- newLocalPaths.add(localPath);
- }
- }
- localPaths = newLocalPaths;
- }
-
-
- long received = 0;
- MessageDigest digester = null;
- if (getChecksum) {
- digester = MD5Hash.getDigester();
- stream = new DigestInputStream(stream, digester);
- }
- boolean finishedReceiving = false;
- int num = 1;
-
- List<FileOutputStream> outputStreams = Lists.newArrayList();
-
- try {
- if (localPaths != null) {
- for (File f : localPaths) {
- try {
- if (f.exists()) {
- LOG.warn("Overwriting existing file " + f
- + " with file downloaded from " + url);
- }
- FileOutputStream fos = new FileOutputStream(f);
- outputStreams.add(fos);
- streamPathMap.put(fos, f);
- } catch (IOException ioe) {
- LOG.warn("Unable to download file " + f, ioe);
- // This will be null if we're downloading the fsimage to a file
- // outside of an NNStorage directory.
- if (dstStorage != null &&
- (dstStorage instanceof StorageErrorReporter)) {
- ((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
- }
- }
- }
-
- if (outputStreams.isEmpty()) {
- throw new IOException(
- "Unable to download to any storage directory");
- }
- }
-
- byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
- while (num > 0) {
- num = stream.read(buf);
- if (num > 0) {
- received += num;
- for (FileOutputStream fos : outputStreams) {
- fos.write(buf, 0, num);
- }
- if (throttler != null) {
- throttler.throttle(num);
- }
- }
- }
- finishedReceiving = true;
- double xferSec = Math.max(
- ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
- long xferKb = received / 1024;
- xferCombined += xferSec;
- xferStats.append(
- String.format(" The fsimage download took %.2fs at %.2f KB/s.",
- xferSec, xferKb / xferSec));
- } finally {
- stream.close();
- for (FileOutputStream fos : outputStreams) {
- long flushStartTime = Time.monotonicNow();
- fos.getChannel().force(true);
- fos.close();
- double writeSec = Math.max(((float)
- (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
- xferCombined += writeSec;
- xferStats.append(String
- .format(" Synchronous (fsync) write to disk of " +
- streamPathMap.get(fos).getAbsolutePath() +
- " took %.2fs.", writeSec));
- }
-
- // Something went wrong and did not finish reading.
- // Remove the temporary files.
- if (!finishedReceiving) {
- deleteTmpFiles(localPaths);
- }
-
- if (finishedReceiving && received != advertisedSize) {
- // only throw this exception if we think we read all of it on our end
- // -- otherwise a client-side IOException would be masked by this
- // exception that makes it look like a server-side problem!
- deleteTmpFiles(localPaths);
- throw new IOException("File " + url + " received length " + received +
- " is not of the advertised size " + advertisedSize +
- ". Fsimage name: " + fsImageName + " lastReceived: " + num);
- }
- }
- xferStats.insert(
- 0, String.format(
- "Combined time for fsimage download and fsync " +
- "to all disks took %.2fs.", xferCombined));
- LOG.info(xferStats.toString());
-
- if (digester != null) {
- MD5Hash computedDigest = new MD5Hash(digester.digest());
-
- if (advertisedDigest != null &&
- !computedDigest.equals(advertisedDigest)) {
- deleteTmpFiles(localPaths);
- throw new IOException("File " + url + " computed digest " +
- computedDigest + " does not match advertised digest " +
- advertisedDigest);
- }
- return computedDigest;
- } else {
- return null;
- }
- }
-
- private static void deleteTmpFiles(List<File> files) {
- if (files == null) {
- return;
- }
-
- LOG.info("Deleting temporary files: " + files);
- for (File file : files) {
- if (!file.delete()) {
- LOG.warn("Deleting " + file + " has failed");
- }
- }
- }
-
- private static MD5Hash parseMD5Header(HttpURLConnection connection) {
- String header = connection.getHeaderField(MD5_HEADER);
- return (header != null) ? new MD5Hash(header) : null;
- }
-
- private static MD5Hash parseMD5Header(HttpServletRequest request) {
- String header = request.getHeader(MD5_HEADER);
- return (header != null) ? new MD5Hash(header) : null;
- }
-
- public static class HttpGetFailedException extends IOException {
- private static final long serialVersionUID = 1L;
- private final int responseCode;
-
- HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException {
- super(msg);
- this.responseCode = connection.getResponseCode();
- }
-
- public int getResponseCode() {
- return responseCode;
- }
- }
-
- public static class HttpPutFailedException extends IOException {
- private static final long serialVersionUID = 1L;
- private final int responseCode;
-
- HttpPutFailedException(String msg, int responseCode) throws IOException {
- super(msg);
- this.responseCode = responseCode;
- }
-
- public int getResponseCode() {
- return responseCode;
- }
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org