You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/10 03:35:43 UTC

[32/50] [abbrv] hadoop git commit: YARN-5199. Close LogReader in in AHSWebServices#getStreamingOutput and FileInputStream in NMWebServices#getLogs. Contributed by Xuan Gong

YARN-5199. Close LogReader in in AHSWebServices#getStreamingOutput and
FileInputStream in NMWebServices#getLogs. Contributed by Xuan Gong


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58be55b6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58be55b6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58be55b6

Branch: refs/heads/HDFS-7240
Commit: 58be55b6e07b94aa55ed87c461f3e5c04cc61630
Parents: 8554aee1b
Author: Xuan <xg...@apache.org>
Authored: Tue Jun 7 16:07:02 2016 -0700
Committer: Xuan <xg...@apache.org>
Committed: Tue Jun 7 16:07:02 2016 -0700

----------------------------------------------------------------------
 .../webapp/AHSWebServices.java                  | 155 ++++++++++---------
 .../nodemanager/webapp/NMWebServices.java       |  71 +++++----
 2 files changed, 118 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/58be55b6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
index d91ae55..59dbd44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
@@ -40,7 +40,6 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
-
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -363,86 +362,94 @@ public class AHSWebServices extends WebServices {
           if ((nodeId == null || nodeName.contains(LogAggregationUtils
               .getNodeString(nodeId))) && !nodeName.endsWith(
               LogAggregationUtils.TMP_FILE_SUFFIX)) {
-            AggregatedLogFormat.LogReader reader =
-                new AggregatedLogFormat.LogReader(conf,
-                    thisNodeFile.getPath());
-            DataInputStream valueStream;
-            LogKey key = new LogKey();
-            valueStream = reader.next(key);
-            while (valueStream != null && !key.toString()
-                .equals(containerIdStr)) {
-              // Next container
-              key = new LogKey();
+            AggregatedLogFormat.LogReader reader = null;
+            try {
+              reader = new AggregatedLogFormat.LogReader(conf,
+                  thisNodeFile.getPath());
+              DataInputStream valueStream;
+              LogKey key = new LogKey();
               valueStream = reader.next(key);
-            }
-            if (valueStream == null) {
-              continue;
-            }
-            while (true) {
-              try {
-                String fileType = valueStream.readUTF();
-                String fileLengthStr = valueStream.readUTF();
-                long fileLength = Long.parseLong(fileLengthStr);
-                if (fileType.equalsIgnoreCase(logFile)) {
-                  StringBuilder sb = new StringBuilder();
-                  sb.append("LogType:");
-                  sb.append(fileType + "\n");
-                  sb.append("Log Upload Time:");
-                  sb.append(Times.format(System.currentTimeMillis()) + "\n");
-                  sb.append("LogLength:");
-                  sb.append(fileLengthStr + "\n");
-                  sb.append("Log Contents:\n");
-                  byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
-                  os.write(b, 0, b.length);
-
-                  long toSkip = 0;
-                  long totalBytesToRead = fileLength;
-                  if (bytes < 0) {
-                    long absBytes = Math.abs(bytes);
-                    if (absBytes < fileLength) {
-                      toSkip = fileLength - absBytes;
-                      totalBytesToRead = absBytes;
+              while (valueStream != null && !key.toString()
+                  .equals(containerIdStr)) {
+                // Next container
+                key = new LogKey();
+                valueStream = reader.next(key);
+              }
+              if (valueStream == null) {
+                continue;
+              }
+              while (true) {
+                try {
+                  String fileType = valueStream.readUTF();
+                  String fileLengthStr = valueStream.readUTF();
+                  long fileLength = Long.parseLong(fileLengthStr);
+                  if (fileType.equalsIgnoreCase(logFile)) {
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("LogType:");
+                    sb.append(fileType + "\n");
+                    sb.append("Log Upload Time:");
+                    sb.append(Times.format(System.currentTimeMillis()) + "\n");
+                    sb.append("LogLength:");
+                    sb.append(fileLengthStr + "\n");
+                    sb.append("Log Contents:\n");
+                    byte[] b = sb.toString().getBytes(
+                        Charset.forName("UTF-8"));
+                    os.write(b, 0, b.length);
+
+                    long toSkip = 0;
+                    long totalBytesToRead = fileLength;
+                    if (bytes < 0) {
+                      long absBytes = Math.abs(bytes);
+                      if (absBytes < fileLength) {
+                        toSkip = fileLength - absBytes;
+                        totalBytesToRead = absBytes;
+                      }
+                      long skippedBytes = valueStream.skip(toSkip);
+                      if (skippedBytes != toSkip) {
+                        throw new IOException("The bytes were skipped are "
+                            + "different from the caller requested");
+                      }
+                    } else {
+                      if (bytes < fileLength) {
+                        totalBytesToRead = bytes;
+                      }
                     }
-                    long skippedBytes = valueStream.skip(toSkip);
-                    if (skippedBytes != toSkip) {
-                      throw new IOException("The bytes were skipped are "
-                          + "different from the caller requested");
+
+                    long curRead = 0;
+                    long pendingRead = totalBytesToRead - curRead;
+                    int toRead = pendingRead > buf.length ? buf.length
+                        : (int) pendingRead;
+                    int len = valueStream.read(buf, 0, toRead);
+                    while (len != -1 && curRead < totalBytesToRead) {
+                      os.write(buf, 0, len);
+                      curRead += len;
+
+                      pendingRead = totalBytesToRead - curRead;
+                      toRead = pendingRead > buf.length ? buf.length
+                          : (int) pendingRead;
+                      len = valueStream.read(buf, 0, toRead);
                     }
+                    sb = new StringBuilder();
+                    sb.append("\nEnd of LogType:" + fileType + "\n");
+                    b = sb.toString().getBytes(Charset.forName("UTF-8"));
+                    os.write(b, 0, b.length);
+                    findLogs = true;
                   } else {
-                    if (bytes < fileLength) {
-                      totalBytesToRead = bytes;
+                    long totalSkipped = 0;
+                    long currSkipped = 0;
+                    while (currSkipped != -1 && totalSkipped < fileLength) {
+                      currSkipped = valueStream.skip(
+                          fileLength - totalSkipped);
+                      totalSkipped += currSkipped;
                     }
                   }
-
-                  long curRead = 0;
-                  long pendingRead = totalBytesToRead - curRead;
-                  int toRead = pendingRead > buf.length ? buf.length
-                      : (int) pendingRead;
-                  int len = valueStream.read(buf, 0, toRead);
-                  while (len != -1 && curRead < totalBytesToRead) {
-                    os.write(buf, 0, len);
-                    curRead += len;
-
-                    pendingRead = totalBytesToRead - curRead;
-                    toRead = pendingRead > buf.length ? buf.length
-                        : (int) pendingRead;
-                    len = valueStream.read(buf, 0, toRead);
-                  }
-                  sb = new StringBuilder();
-                  sb.append("\nEnd of LogType:" + fileType + "\n");
-                  b = sb.toString().getBytes(Charset.forName("UTF-8"));
-                  os.write(b, 0, b.length);
-                  findLogs = true;
-                } else {
-                  long totalSkipped = 0;
-                  long currSkipped = 0;
-                  while (currSkipped != -1 && totalSkipped < fileLength) {
-                    currSkipped = valueStream.skip(fileLength - totalSkipped);
-                    totalSkipped += currSkipped;
-                  }
+                } catch (EOFException eof) {
+                  break;
                 }
-              } catch (EOFException eof) {
-                break;
+              }
+            } finally {
+              if (reader != null) {
+                reader.close();
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/58be55b6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
index e13baa7..943f3cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
@@ -37,7 +37,7 @@ import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
-
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -236,7 +236,6 @@ public class NMWebServices {
     }
     boolean downloadFile = parseBooleanParam(download);
     final long bytes = parseLongParam(size);
-
     try {
       final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
           containerIdStr, logFile, nmContext);
@@ -246,42 +245,46 @@ public class NMWebServices {
         @Override
         public void write(OutputStream os) throws IOException,
             WebApplicationException {
-          int bufferSize = 65536;
-          byte[] buf = new byte[bufferSize];
-          long toSkip = 0;
-          long totalBytesToRead = fileLength;
-          if (bytes < 0) {
-            long absBytes = Math.abs(bytes);
-            if (absBytes < fileLength) {
-              toSkip = fileLength - absBytes;
-              totalBytesToRead = absBytes;
-            }
-            long skippedBytes = fis.skip(toSkip);
-            if (skippedBytes != toSkip) {
-              throw new IOException("The bytes were skipped are different "
-                  + "from the caller requested");
-            }
-          } else {
-            if (bytes < fileLength) {
-              totalBytesToRead = bytes;
+          try {
+            int bufferSize = 65536;
+            byte[] buf = new byte[bufferSize];
+            long toSkip = 0;
+            long totalBytesToRead = fileLength;
+            if (bytes < 0) {
+              long absBytes = Math.abs(bytes);
+              if (absBytes < fileLength) {
+                toSkip = fileLength - absBytes;
+                totalBytesToRead = absBytes;
+              }
+              long skippedBytes = fis.skip(toSkip);
+              if (skippedBytes != toSkip) {
+                throw new IOException("The bytes were skipped are different "
+                    + "from the caller requested");
+              }
+            } else {
+              if (bytes < fileLength) {
+                totalBytesToRead = bytes;
+              }
             }
-          }
-
-          long curRead = 0;
-          long pendingRead = totalBytesToRead - curRead;
-          int toRead = pendingRead > buf.length ? buf.length
-              : (int) pendingRead;
-          int len = fis.read(buf, 0, toRead);
-          while (len != -1 && curRead < totalBytesToRead) {
-            os.write(buf, 0, len);
-            curRead += len;
 
-            pendingRead = totalBytesToRead - curRead;
-            toRead = pendingRead > buf.length ? buf.length
+            long curRead = 0;
+            long pendingRead = totalBytesToRead - curRead;
+            int toRead = pendingRead > buf.length ? buf.length
                 : (int) pendingRead;
-            len = fis.read(buf, 0, toRead);
+            int len = fis.read(buf, 0, toRead);
+            while (len != -1 && curRead < totalBytesToRead) {
+              os.write(buf, 0, len);
+              curRead += len;
+
+              pendingRead = totalBytesToRead - curRead;
+              toRead = pendingRead > buf.length ? buf.length
+                  : (int) pendingRead;
+              len = fis.read(buf, 0, toRead);
+            }
+            os.flush();
+          } finally {
+            IOUtils.closeQuietly(fis);
           }
-          os.flush();
         }
       };
       ResponseBuilder resp = Response.ok(stream);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org