You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/01/16 01:10:45 UTC

[19/43] hadoop git commit: MAPREDUCE-6363. [NNBench] Lease mismatch error when running with multiple mappers. Contributed by Vlad Sharanhovich and Bibin A Chundatt.

MAPREDUCE-6363. [NNBench] Lease mismatch error when running with multiple mappers. Contributed by Vlad Sharanhovich and Bibin A Chundatt.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b0964f3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b0964f3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b0964f3

Branch: refs/heads/HDFS-1312
Commit: 7b0964f354e90968c2dac2f7acc17214732aed64
Parents: 8315582
Author: Akira Ajisaka <aa...@apache.org>
Authored: Thu Jan 14 10:40:22 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Thu Jan 14 10:40:22 2016 +0900

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../java/org/apache/hadoop/hdfs/NNBench.java    | 134 ++++++++++---------
 2 files changed, 72 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b0964f3/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index e6bc050..7d5d11a 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -1009,6 +1009,9 @@ Release 2.6.4 - UNRELEASED
     TaskAttemptImpl#sendJHStartEventForAssignedFailTask (Bibin A Chundatt via
     jlowe)
 
+    MAPREDUCE-6363. [NNBench] Lease mismatch error when running with multiple
+    mappers. (Vlad Sharanhovich and Bibin A Chundatt via aajisaka)
+
 Release 2.6.3 - 2015-12-17
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b0964f3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java
index b6c0104..666ef0e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java
@@ -18,45 +18,42 @@
 
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
-import java.util.Date;
+import java.io.BufferedReader;
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
-import java.io.File;
-import java.io.BufferedReader;
-import java.util.StringTokenizer;
 import java.net.InetAddress;
 import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Iterator;
+import java.util.StringTokenizer;
 
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
-
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile;
-
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
 
 /**
  * This program executes a specified operation that applies load to 
@@ -149,7 +146,7 @@ public class NNBench {
       try {
         writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, 
                 LongWritable.class, CompressionType.NONE);
-        writer.append(new Text(strFileName), new LongWritable(0l));
+        writer.append(new Text(strFileName), new LongWritable(i));
       } finally {
         if (writer != null) {
           writer.close();
@@ -309,14 +306,7 @@ public class NNBench {
    */
   private static void analyzeResults() throws IOException {
     final FileSystem fs = FileSystem.get(config);
-    Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME),
-            "part-00000");
-
-    DataInputStream in;
-    in = new DataInputStream(fs.open(reduceFile));
-
-    BufferedReader lines;
-    lines = new BufferedReader(new InputStreamReader(in));
+    Path reduceDir = new Path(baseDir, OUTPUT_DIR_NAME);
 
     long totalTimeAL1 = 0l;
     long totalTimeAL2 = 0l;
@@ -327,32 +317,38 @@ public class NNBench {
     
     long mapStartTimeTPmS = 0l;
     long mapEndTimeTPmS = 0l;
-    
-    String resultTPSLine1 = null;
-    String resultTPSLine2 = null;
-    String resultALLine1 = null;
-    String resultALLine2 = null;
-    
-    String line;
-    while((line = lines.readLine()) != null) {
-      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;");
-      String attr = tokens.nextToken();
-      if (attr.endsWith(":totalTimeAL1")) {
-        totalTimeAL1 = Long.parseLong(tokens.nextToken());
-      } else if (attr.endsWith(":totalTimeAL2")) {
-        totalTimeAL2 = Long.parseLong(tokens.nextToken());
-      } else if (attr.endsWith(":totalTimeTPmS")) {
-        totalTimeTPmS = Long.parseLong(tokens.nextToken());
-      } else if (attr.endsWith(":latemaps")) {
-        lateMaps = Long.parseLong(tokens.nextToken());
-      } else if (attr.endsWith(":numOfExceptions")) {
-        numOfExceptions = Long.parseLong(tokens.nextToken());
-      } else if (attr.endsWith(":successfulFileOps")) {
-        successfulFileOps = Long.parseLong(tokens.nextToken());
-      } else if (attr.endsWith(":mapStartTimeTPmS")) {
-        mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
-      } else if (attr.endsWith(":mapEndTimeTPmS")) {
-        mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
+
+    FileStatus[] fss = fs.listStatus(reduceDir);
+    for (FileStatus status : fss) {
+
+      Path reduceFile = status.getPath();
+      DataInputStream in;
+      in = new DataInputStream(fs.open(reduceFile));
+
+      BufferedReader lines;
+      lines = new BufferedReader(new InputStreamReader(in));
+
+      String line;
+      while ((line = lines.readLine()) != null) {
+        StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;");
+        String attr = tokens.nextToken();
+        if (attr.endsWith(":totalTimeAL1")) {
+          totalTimeAL1 = Long.parseLong(tokens.nextToken());
+        } else if (attr.endsWith(":totalTimeAL2")) {
+          totalTimeAL2 = Long.parseLong(tokens.nextToken());
+        } else if (attr.endsWith(":totalTimeTPmS")) {
+          totalTimeTPmS = Long.parseLong(tokens.nextToken());
+        } else if (attr.endsWith(":latemaps")) {
+          lateMaps = Long.parseLong(tokens.nextToken());
+        } else if (attr.endsWith(":numOfExceptions")) {
+          numOfExceptions = Long.parseLong(tokens.nextToken());
+        } else if (attr.endsWith(":successfulFileOps")) {
+          successfulFileOps = Long.parseLong(tokens.nextToken());
+        } else if (attr.endsWith(":mapStartTimeTPmS")) {
+          mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
+        } else if (attr.endsWith(":mapEndTimeTPmS")) {
+          mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
+        }
       }
     }
     
@@ -377,6 +373,11 @@ public class NNBench {
         (double) successfulFileOps : 
         (double) totalTimeTPmS / successfulFileOps;
             
+    String resultTPSLine1 = null;
+    String resultTPSLine2 = null;
+    String resultALLine1 = null;
+    String resultALLine2 = null;
+
     if (operation.equals(OP_CREATE_WRITE)) {
       // For create/write/close, it is treated as two transactions,
       // since a file create from a client perspective involves create and close
@@ -699,18 +700,21 @@ public class NNBench {
       successfulFileOps = 0l;
       
       if (barrier()) {
+        String fileName = "file_" + value;
         if (op.equals(OP_CREATE_WRITE)) {
           startTimeTPmS = System.currentTimeMillis();
-          doCreateWriteOp("file_" + hostName + "_", reporter);
+          doCreateWriteOp(fileName, reporter);
         } else if (op.equals(OP_OPEN_READ)) {
           startTimeTPmS = System.currentTimeMillis();
-          doOpenReadOp("file_" + hostName + "_", reporter);
+          doOpenReadOp(fileName, reporter);
         } else if (op.equals(OP_RENAME)) {
           startTimeTPmS = System.currentTimeMillis();
-          doRenameOp("file_" + hostName + "_", reporter);
+          doRenameOp(fileName, reporter);
         } else if (op.equals(OP_DELETE)) {
           startTimeTPmS = System.currentTimeMillis();
-          doDeleteOp("file_" + hostName + "_", reporter);
+        } else {
+          throw new IllegalArgumentException(
+              "unsupported operation [" + op + "]");
         }
         
         endTimeTPms = System.currentTimeMillis();
@@ -777,9 +781,8 @@ public class NNBench {
 
             reporter.setStatus("Finish "+ l + " files");
           } catch (IOException e) {
-            LOG.info("Exception recorded in op: " +
-                    "Create/Write/Close");
- 
+            LOG.error("Exception recorded in op: Create/Write/Close, "
+                + "file: \"" + filePath + "\"", e);
             numOfExceptions++;
           }
         }
@@ -822,7 +825,8 @@ public class NNBench {
 
             reporter.setStatus("Finish "+ l + " files");
           } catch (IOException e) {
-            LOG.info("Exception recorded in op: OpenRead " + e);
+            LOG.error("Exception recorded in op: OpenRead, " + "file: \""
+                + filePath + "\"", e);
             numOfExceptions++;
           }
         }
@@ -856,8 +860,8 @@ public class NNBench {
 
             reporter.setStatus("Finish "+ l + " files");
           } catch (IOException e) {
-            LOG.info("Exception recorded in op: Rename");
-
+            LOG.error("Exception recorded in op: Rename, " + "file: \""
+                + filePath + "\"", e);
             numOfExceptions++;
           }
         }
@@ -889,8 +893,8 @@ public class NNBench {
 
             reporter.setStatus("Finish "+ l + " files");
           } catch (IOException e) {
-            LOG.info("Exception in recorded op: Delete");
-
+            LOG.error("Exception recorded in op: Delete, " + "file: \""
+                + filePath + "\"", e);
             numOfExceptions++;
           }
         }