You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/08 21:55:48 UTC

[hadoop] branch branch-2 updated: HDFS-14696. Backport HDFS-11273 to branch-2 (Move TransferFsImage#doGetUrl function to a Util class) (#1251) Contributed by Siyao Meng.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 15062b6  HDFS-14696. Backport HDFS-11273 to branch-2 (Move TransferFsImage#doGetUrl function to a Util class) (#1251) Contributed by Siyao Meng.
15062b6 is described below

commit 15062b6d2882d162dabaf884933d9625fff5ae5f
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Thu Aug 8 14:55:41 2019 -0700

    HDFS-14696. Backport HDFS-11273 to branch-2 (Move TransferFsImage#doGetUrl function to a Util class) (#1251) Contributed by Siyao Meng.
---
 .../hdfs/server/common/HttpGetFailedException.java |  43 ++++
 .../hdfs/server/common/HttpPutFailedException.java |  43 ++++
 .../org/apache/hadoop/hdfs/server/common/Util.java | 261 ++++++++++++++++++-
 .../server/namenode/EditLogFileInputStream.java    |   3 +-
 .../hadoop/hdfs/server/namenode/ImageServlet.java  |  15 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java    |   4 +-
 .../hdfs/server/namenode/TransferFsImage.java      | 286 ++-------------------
 7 files changed, 374 insertions(+), 281 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java
new file mode 100644
index 0000000..592bee2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpGetFailedException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+
+/**
+ * The exception is thrown when HTTP GET operation has failed.
+ *
+ */
+@InterfaceAudience.Private
+public class HttpGetFailedException extends IOException {
+  private static final long serialVersionUID = 1L;
+  private final int responseCode;
+
+  public HttpGetFailedException(String msg, HttpURLConnection connection)
+      throws IOException {
+    super(msg);
+    this.responseCode = connection.getResponseCode();
+  }
+
+  public int getResponseCode() {
+    return responseCode;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java
new file mode 100644
index 0000000..77a0ee1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HttpPutFailedException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+
+
+/**
+ * The exception is thrown when HTTP PUT operation has failed.
+ *
+ */
+@InterfaceAudience.Private
+public class HttpPutFailedException extends IOException {
+  private static final long serialVersionUID = 1L;
+  private final int responseCode;
+
+  public HttpPutFailedException(String msg, int responseCode)
+      throws IOException {
+    super(msg);
+    this.responseCode = responseCode;
+  }
+
+  public int getResponseCode() {
+    return responseCode;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
index ad5d52c..8f7588f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
@@ -23,26 +23,64 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 
 @InterfaceAudience.Private
 public final class Util {
   private final static Log LOG = LogFactory.getLog(Util.class.getName());
 
+  /* Required headers for FSImage transfer. */
+  public final static String FILE_LENGTH = "File-Length";
+  public final static String CONTENT_LENGTH = "Content-Length";
+  public final static String MD5_HEADER = "X-MD5-Digest";
+  public final static String CONTENT_TYPE = "Content-Type";
+  public final static String CONTENT_TRANSFER_ENCODING =
+      "Content-Transfer-Encoding";
+
+  public final static int IO_FILE_BUFFER_SIZE;
+  private static final boolean isSpnegoEnabled;
+  public static final URLConnectionFactory connectionFactory;
+
+  static {
+    Configuration conf = new Configuration();
+    connectionFactory = URLConnectionFactory
+        .newDefaultURLConnectionFactory(conf);
+    isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
+    IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
+  }
+
   /**
    * Interprets the passed string as a URI. In case of error it 
    * assumes the specified string is a file.
    *
    * @param s the string to interpret
-   * @return the resulting URI 
-   * @throws IOException 
+   * @return the resulting URI
    */
-  public static URI stringAsURI(String s) throws IOException {
+  static URI stringAsURI(String s) throws IOException {
     URI u = null;
     // try to make a URI
     try {
@@ -67,7 +105,6 @@ public final class Util {
    * 
    * @param f the file to convert
    * @return the resulting URI
-   * @throws IOException
    */
   public static URI fileAsURI(File f) throws IOException {
     URI u = f.getCanonicalFile().toURI();
@@ -92,7 +129,7 @@ public final class Util {
    */
   public static List<URI> stringCollectionAsURIs(
                                   Collection<String> names) {
-    List<URI> uris = new ArrayList<URI>(names.size());
+    List<URI> uris = new ArrayList<>(names.size());
     for(String name : names) {
       try {
         uris.add(stringAsURI(name));
@@ -103,7 +140,6 @@ public final class Util {
     return uris;
   }
 
-
   public static boolean isDiskStatsEnabled(int fileIOSamplingPercentage) {
     final boolean isEnabled;
     if (fileIOSamplingPercentage <= 0) {
@@ -120,4 +156,217 @@ public final class Util {
 
     return isEnabled;
   }
+
+  /**
+   * Downloads the files at the specified url location into destination
+   * storage.
+   */
+  public static MD5Hash doGetUrl(URL url, List<File> localPaths,
+      Storage dstStorage, boolean getChecksum, int timeout) throws IOException {
+    HttpURLConnection connection;
+    try {
+      connection = (HttpURLConnection)
+          connectionFactory.openConnection(url, isSpnegoEnabled);
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
+    }
+
+    setTimeout(connection, timeout);
+
+    if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+      throw new HttpGetFailedException("Image transfer servlet at " + url +
+              " failed with status code " + connection.getResponseCode() +
+              "\nResponse message:\n" + connection.getResponseMessage(),
+          connection);
+    }
+
+    long advertisedSize;
+    String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+    if (contentLength != null) {
+      advertisedSize = Long.parseLong(contentLength);
+    } else {
+      throw new IOException(CONTENT_LENGTH + " header is not provided " +
+          "by the namenode when trying to fetch " + url);
+    }
+    MD5Hash advertisedDigest = parseMD5Header(connection);
+    String fsImageName = connection
+        .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
+    InputStream stream = connection.getInputStream();
+
+    return receiveFile(url.toExternalForm(), localPaths, dstStorage,
+        getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
+        null);
+  }
+
+  /**
+   * Receives file at the url location from the input stream and puts them in
+   * the specified destination storage location.
+   */
+  public static MD5Hash receiveFile(String url, List<File> localPaths,
+      Storage dstStorage, boolean getChecksum, long advertisedSize,
+      MD5Hash advertisedDigest, String fsImageName, InputStream stream,
+      DataTransferThrottler throttler) throws
+      IOException {
+    long startTime = Time.monotonicNow();
+    Map<FileOutputStream, File> streamPathMap = new HashMap<>();
+    StringBuilder xferStats = new StringBuilder();
+    double xferCombined = 0;
+    if (localPaths != null) {
+      // If the local paths refer to directories, use the server-provided header
+      // as the filename within that directory
+      List<File> newLocalPaths = new ArrayList<>();
+      for (File localPath : localPaths) {
+        if (localPath.isDirectory()) {
+          if (fsImageName == null) {
+            throw new IOException("No filename header provided by server");
+          }
+          newLocalPaths.add(new File(localPath, fsImageName));
+        } else {
+          newLocalPaths.add(localPath);
+        }
+      }
+      localPaths = newLocalPaths;
+    }
+
+
+    long received = 0;
+    MessageDigest digester = null;
+    if (getChecksum) {
+      digester = MD5Hash.getDigester();
+      stream = new DigestInputStream(stream, digester);
+    }
+    boolean finishedReceiving = false;
+
+    List<FileOutputStream> outputStreams = Lists.newArrayList();
+
+    try {
+      if (localPaths != null) {
+        for (File f : localPaths) {
+          try {
+            if (f.exists()) {
+              LOG.warn("Overwriting existing file " + f
+                  + " with file downloaded from " + url);
+            }
+            FileOutputStream fos = new FileOutputStream(f);
+            outputStreams.add(fos);
+            streamPathMap.put(fos, f);
+          } catch (IOException ioe) {
+            LOG.warn("Unable to download file " + f, ioe);
+            // This will be null if we're downloading the fsimage to a file
+            // outside of an NNStorage directory.
+            if (dstStorage != null &&
+                (dstStorage instanceof StorageErrorReporter)) {
+              ((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
+            }
+          }
+        }
+
+        if (outputStreams.isEmpty()) {
+          throw new IOException(
+              "Unable to download to any storage directory");
+        }
+      }
+
+      int num = 1;
+      byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
+      while (num > 0) {
+        num = stream.read(buf);
+        if (num > 0) {
+          received += num;
+          for (FileOutputStream fos : outputStreams) {
+            fos.write(buf, 0, num);
+          }
+          if (throttler != null) {
+            throttler.throttle(num);
+          }
+        }
+      }
+      finishedReceiving = true;
+      double xferSec = Math.max(
+          ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
+      long xferKb = received / 1024;
+      xferCombined += xferSec;
+      xferStats.append(
+          String.format(" The fsimage download took %.2fs at %.2f KB/s.",
+              xferSec, xferKb / xferSec));
+    } finally {
+      stream.close();
+      for (FileOutputStream fos : outputStreams) {
+        long flushStartTime = Time.monotonicNow();
+        fos.getChannel().force(true);
+        fos.close();
+        double writeSec = Math.max(((float)
+            (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
+        xferCombined += writeSec;
+        xferStats.append(String
+            .format(" Synchronous (fsync) write to disk of " +
+                streamPathMap.get(fos).getAbsolutePath() +
+                " took %.2fs.", writeSec));
+      }
+
+      // Something went wrong and did not finish reading.
+      // Remove the temporary files.
+      if (!finishedReceiving) {
+        deleteTmpFiles(localPaths);
+      }
+
+      if (finishedReceiving && received != advertisedSize) {
+        // only throw this exception if we think we read all of it on our end
+        // -- otherwise a client-side IOException would be masked by this
+        // exception that makes it look like a server-side problem!
+        deleteTmpFiles(localPaths);
+        throw new IOException("File " + url + " received length " + received +
+            " is not of the advertised size " +
+            advertisedSize);
+      }
+    }
+    xferStats.insert(0, String.format("Combined time for fsimage download and" +
+        " fsync to all disks took %.2fs.", xferCombined));
+    LOG.info(xferStats.toString());
+
+    if (digester != null) {
+      MD5Hash computedDigest = new MD5Hash(digester.digest());
+
+      if (advertisedDigest != null &&
+          !computedDigest.equals(advertisedDigest)) {
+        deleteTmpFiles(localPaths);
+        throw new IOException("File " + url + " computed digest " +
+            computedDigest + " does not match advertised digest " +
+            advertisedDigest);
+      }
+      return computedDigest;
+    } else {
+      return null;
+    }
+  }
+
+  private static void deleteTmpFiles(List<File> files) {
+    if (files == null) {
+      return;
+    }
+
+    LOG.info("Deleting temporary files: " + files);
+    for (File file : files) {
+      if (!file.delete()) {
+        LOG.warn("Deleting " + file + " has failed");
+      }
+    }
+  }
+
+  /**
+   * Sets a timeout value in millisecods for the Http connection.
+   * @param connection the Http connection for which timeout needs to be set
+   * @param timeout value to be set as timeout in milliseconds
+   */
+  public static void setTimeout(HttpURLConnection connection, int timeout) {
+    if (timeout > 0) {
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+    }
+  }
+
+  private static MD5Hash parseMD5Header(HttpURLConnection connection) {
+    String header = connection.getHeaderField(MD5_HEADER);
+    return (header != null) ? new MD5Hash(header) : null;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
index 33a3131..af72027f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
@@ -36,9 +36,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.LayoutFlags;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
-import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
index 9223abd..fdd8d70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.net.HttpURLConnection;
@@ -304,11 +305,12 @@ public class ImageServlet extends HttpServlet {
    */
   public static void setVerificationHeadersForGet(HttpServletResponse response,
       File file) throws IOException {
-    response.setHeader(TransferFsImage.CONTENT_LENGTH,
+    response.setHeader(
+        Util.CONTENT_LENGTH,
         String.valueOf(file.length()));
     MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
     if (hash != null) {
-      response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
+      response.setHeader(Util.MD5_HEADER, hash.toString());
     }
   }
   
@@ -437,12 +439,13 @@ public class ImageServlet extends HttpServlet {
    */
   static void setVerificationHeadersForPut(HttpURLConnection connection,
       File file) throws IOException {
-    connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
+    connection.setRequestProperty(
+        Util.CONTENT_LENGTH,
         String.valueOf(file.length()));
     MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
     if (hash != null) {
       connection
-          .setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
+          .setRequestProperty(Util.MD5_HEADER, hash.toString());
     }
   }
 
@@ -462,7 +465,7 @@ public class ImageServlet extends HttpServlet {
     params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
     // setting the length of the file to be uploaded in separate property as
     // Content-Length only supports up to 2GB
-    params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
+    params.put(Util.FILE_LENGTH, Long.toString(imageFileSize));
     params.put(IMAGE_FILE_TYPE, nnf.name());
     return params;
   }
@@ -586,7 +589,7 @@ public class ImageServlet extends HttpServlet {
       txId = ServletUtil.parseLongParam(request, TXID_PARAM);
       storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
       fileSize = ServletUtil.parseLongParam(request,
-          TransferFsImage.FILE_LENGTH);
+          Util.FILE_LENGTH);
       String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
       nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
           .valueOf(imageType);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 830320a..9f9ad26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -28,6 +28,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
+
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.FileNotFoundException;
@@ -137,6 +138,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@@ -2117,7 +2119,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     } catch (FileNotFoundException e) {
       LOG.debug("Tried to read from deleted or moved edit log segment", e);
       return null;
-    } catch (TransferFsImage.HttpGetFailedException e) {
+    } catch (HttpGetFailedException e) {
       LOG.debug("Tried to read from deleted edit log segment", e);
       return null;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 65200fd..1ceb704 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -19,18 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.security.DigestInputStream;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -44,17 +38,16 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.common.HttpPutFailedException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -66,6 +59,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.mortbay.jetty.EofException;
 
+import static org.apache.hadoop.hdfs.server.common.Util.IO_FILE_BUFFER_SIZE;
+import static org.apache.hadoop.hdfs.server.common.Util.connectionFactory;
+
 /**
  * This class provides fetching a specified file from the NameNode.
  */
@@ -98,27 +94,8 @@ public class TransferFsImage {
     }
   }
 
-  public final static String CONTENT_LENGTH = "Content-Length";
-  public final static String FILE_LENGTH = "File-Length";
-  public final static String MD5_HEADER = "X-MD5-Digest";
-
-  private final static String CONTENT_TYPE = "Content-Type";
-  private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
-  private final static int IO_FILE_BUFFER_SIZE;
-
   @VisibleForTesting
   static int timeout = 0;
-  private static final URLConnectionFactory connectionFactory;
-  private static final boolean isSpnegoEnabled;
-
-  static {
-    Configuration conf = new Configuration();
-    connectionFactory = URLConnectionFactory
-        .newDefaultURLConnectionFactory(conf);
-    isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
-    IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
-  }
-
   private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
   
   public static void downloadMostRecentImageToDirectory(URL infoServer,
@@ -159,7 +136,7 @@ public class TransferFsImage {
     }
 
     MD5Hash advertisedDigest = parseMD5Header(request);
-    MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
+    MD5Hash hash = Util.receiveFile(fileName, dstFiles, dstStorage, true,
         advertisedSize, advertisedDigest, fileName, stream, throttler);
     LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
         + dstFiles.get(0).length() + " bytes.");
@@ -323,9 +300,7 @@ public class TransferFsImage {
             responseCode, urlWithParams, connection.getResponseMessage()),
             responseCode);
       }
-    } catch (AuthenticationException e) {
-      throw new IOException(e);
-    } catch (URISyntaxException e) {
+    } catch (AuthenticationException | URISyntaxException e) {
       throw new IOException(e);
     } finally {
       if (connection != null) {
@@ -336,9 +311,10 @@ public class TransferFsImage {
 
   private static void writeFileToPutRequest(Configuration conf,
       HttpURLConnection connection, File imageFile, Canceler canceler)
-      throws FileNotFoundException, IOException {
-    connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
-    connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
+      throws IOException {
+    connection.setRequestProperty(Util.CONTENT_TYPE,
+        "application/octet-stream");
+    connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary");
     OutputStream output = connection.getOutputStream();
     FileInputStream input = new FileInputStream(imageFile);
     try {
@@ -431,7 +407,7 @@ public class TransferFsImage {
    * Copies the response from the URL to a list of local files.
    * @param dstStorage if an error occurs writing to one of the files,
    *                   this storage object will be notified. 
-   * @Return a digest of the received file if getChecksum is true
+   * @return a digest of the received file if getChecksum is true
    */
   static MD5Hash getFileClient(URL infoServer,
       String queryString, List<File> localPaths,
@@ -443,40 +419,12 @@ public class TransferFsImage {
   
   public static MD5Hash doGetUrl(URL url, List<File> localPaths,
       Storage dstStorage, boolean getChecksum) throws IOException {
-    HttpURLConnection connection;
-    try {
-      connection = (HttpURLConnection)
-        connectionFactory.openConnection(url, isSpnegoEnabled);
-    } catch (AuthenticationException e) {
-      throw new IOException(e);
-    }
-
-    setTimeout(connection);
+    return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout);
+  }
 
-    if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
-      throw new HttpGetFailedException(
-          "Image transfer servlet at " + url +
-          " failed with status code " + connection.getResponseCode() +
-          "\nResponse message:\n" + connection.getResponseMessage(),
-          connection);
-    }
-    
-    long advertisedSize;
-    String contentLength = connection.getHeaderField(CONTENT_LENGTH);
-    if (contentLength != null) {
-      advertisedSize = Long.parseLong(contentLength);
-    } else {
-      throw new IOException(CONTENT_LENGTH + " header is not provided " +
-                            "by the namenode when trying to fetch " + url);
-    }
-    MD5Hash advertisedDigest = parseMD5Header(connection);
-    String fsImageName = connection
-        .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
-    InputStream stream = connection.getInputStream();
-
-    return receiveFile(url.toExternalForm(), localPaths, dstStorage,
-        getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
-        null);
+  private static MD5Hash parseMD5Header(HttpServletRequest request) {
+    String header = request.getHeader(Util.MD5_HEADER);
+    return (header != null) ? new MD5Hash(header) : null;
   }
 
   private static void setTimeout(HttpURLConnection connection) {
@@ -484,204 +432,10 @@ public class TransferFsImage {
       Configuration conf = new HdfsConfiguration();
       timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
           DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
-      LOG.info("Image Transfer timeout configured to " + timeout
-          + " milliseconds");
+      LOG.info("Image Transfer timeout configured to " + timeout +
+          " milliseconds");
     }
 
-    if (timeout > 0) {
-      connection.setConnectTimeout(timeout);
-      connection.setReadTimeout(timeout);
-    }
+    Util.setTimeout(connection, timeout);
   }
-
-  private static MD5Hash receiveFile(String url, List<File> localPaths,
-      Storage dstStorage, boolean getChecksum, long advertisedSize,
-      MD5Hash advertisedDigest, String fsImageName, InputStream stream,
-      DataTransferThrottler throttler) throws IOException {
-    long startTime = Time.monotonicNow();
-    Map<FileOutputStream, File> streamPathMap = new HashMap<>();
-    StringBuilder xferStats = new StringBuilder();
-    double xferCombined = 0;
-    if (localPaths != null) {
-      // If the local paths refer to directories, use the server-provided header
-      // as the filename within that directory
-      List<File> newLocalPaths = new ArrayList<File>();
-      for (File localPath : localPaths) {
-        if (localPath.isDirectory()) {
-          if (fsImageName == null) {
-            throw new IOException("No filename header provided by server");
-          }
-          newLocalPaths.add(new File(localPath, fsImageName));
-        } else {
-          newLocalPaths.add(localPath);
-        }
-      }
-      localPaths = newLocalPaths;
-    }
-    
-
-    long received = 0;
-    MessageDigest digester = null;
-    if (getChecksum) {
-      digester = MD5Hash.getDigester();
-      stream = new DigestInputStream(stream, digester);
-    }
-    boolean finishedReceiving = false;
-    int num = 1;
-
-    List<FileOutputStream> outputStreams = Lists.newArrayList();
-
-    try {
-      if (localPaths != null) {
-        for (File f : localPaths) {
-          try {
-            if (f.exists()) {
-              LOG.warn("Overwriting existing file " + f
-                  + " with file downloaded from " + url);
-            }
-            FileOutputStream fos = new FileOutputStream(f);
-            outputStreams.add(fos);
-            streamPathMap.put(fos, f);
-          } catch (IOException ioe) {
-            LOG.warn("Unable to download file " + f, ioe);
-            // This will be null if we're downloading the fsimage to a file
-            // outside of an NNStorage directory.
-            if (dstStorage != null &&
-                (dstStorage instanceof StorageErrorReporter)) {
-              ((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
-            }
-          }
-        }
-        
-        if (outputStreams.isEmpty()) {
-          throw new IOException(
-              "Unable to download to any storage directory");
-        }
-      }
-      
-      byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
-      while (num > 0) {
-        num = stream.read(buf);
-        if (num > 0) {
-          received += num;
-          for (FileOutputStream fos : outputStreams) {
-            fos.write(buf, 0, num);
-          }
-          if (throttler != null) {
-            throttler.throttle(num);
-          }
-        }
-      }
-      finishedReceiving = true;
-      double xferSec = Math.max(
-                 ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
-      long xferKb = received / 1024;
-      xferCombined += xferSec;
-      xferStats.append(
-          String.format(" The fsimage download took %.2fs at %.2f KB/s.",
-              xferSec, xferKb / xferSec));
-    } finally {
-      stream.close();
-      for (FileOutputStream fos : outputStreams) {
-        long flushStartTime = Time.monotonicNow();
-        fos.getChannel().force(true);
-        fos.close();
-        double writeSec = Math.max(((float)
-               (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
-        xferCombined += writeSec;
-        xferStats.append(String
-                .format(" Synchronous (fsync) write to disk of " +
-                 streamPathMap.get(fos).getAbsolutePath() +
-                " took %.2fs.", writeSec));
-      }
-
-      // Something went wrong and did not finish reading.
-      // Remove the temporary files.
-      if (!finishedReceiving) {
-        deleteTmpFiles(localPaths);
-      }
-
-      if (finishedReceiving && received != advertisedSize) {
-        // only throw this exception if we think we read all of it on our end
-        // -- otherwise a client-side IOException would be masked by this
-        // exception that makes it look like a server-side problem!
-        deleteTmpFiles(localPaths);
-        throw new IOException("File " + url + " received length " + received +
-            " is not of the advertised size " + advertisedSize +
-            ". Fsimage name: " + fsImageName + " lastReceived: " + num);
-      }
-    }
-    xferStats.insert(
-        0, String.format(
-            "Combined time for fsimage download and fsync " +
-            "to all disks took %.2fs.", xferCombined));
-    LOG.info(xferStats.toString());
-
-    if (digester != null) {
-      MD5Hash computedDigest = new MD5Hash(digester.digest());
-      
-      if (advertisedDigest != null &&
-          !computedDigest.equals(advertisedDigest)) {
-        deleteTmpFiles(localPaths);
-        throw new IOException("File " + url + " computed digest " +
-            computedDigest + " does not match advertised digest " + 
-            advertisedDigest);
-      }
-      return computedDigest;
-    } else {
-      return null;
-    }    
-  }
-
-  private static void deleteTmpFiles(List<File> files) {
-    if (files == null) {
-      return;
-    }
-
-    LOG.info("Deleting temporary files: " + files);
-    for (File file : files) {
-      if (!file.delete()) {
-        LOG.warn("Deleting " + file + " has failed");
-      }
-    }
-  }
-
-  private static MD5Hash parseMD5Header(HttpURLConnection connection) {
-    String header = connection.getHeaderField(MD5_HEADER);
-    return (header != null) ? new MD5Hash(header) : null;
-  }
-
-  private static MD5Hash parseMD5Header(HttpServletRequest request) {
-    String header = request.getHeader(MD5_HEADER);
-    return (header != null) ? new MD5Hash(header) : null;
-  }
-
-  public static class HttpGetFailedException extends IOException {
-    private static final long serialVersionUID = 1L;
-    private final int responseCode;
-
-    HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException {
-      super(msg);
-      this.responseCode = connection.getResponseCode();
-    }
-    
-    public int getResponseCode() {
-      return responseCode;
-    }
-  }
-
-  public static class HttpPutFailedException extends IOException {
-    private static final long serialVersionUID = 1L;
-    private final int responseCode;
-
-    HttpPutFailedException(String msg, int responseCode) throws IOException {
-      super(msg);
-      this.responseCode = responseCode;
-    }
-
-    public int getResponseCode() {
-      return responseCode;
-    }
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org