You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/07/02 08:20:26 UTC

svn commit: r959867 - in /hadoop/mapreduce/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/

Author: amareshwari
Date: Fri Jul  2 06:20:26 2010
New Revision: 959867

URL: http://svn.apache.org/viewvc?rev=959867&view=rev
Log:
MAPREDUCE-1864. Removes uninitialized/unused variables in org.apache.hadoop.streaming.PipeMapRed. Contributed by Amareshwari Sriramadasu

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=959867&r1=959866&r2=959867&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul  2 06:20:26 2010
@@ -134,6 +134,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1863. Fix NPE in Rumen when processing null CDF for failed task
     attempts. (Amar Kamat via cdouglas)
 
+    MAPREDUCE-1864. Removes uninitialized/unused variables in
+    org.apache.hadoop.streaming.PipeMapRed. (amareshwari)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=959867&r1=959866&r2=959867&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Jul  2 06:20:26 2010
@@ -19,7 +19,6 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.util.Date;
 import java.util.Map;
 import java.util.Iterator;
 import java.util.Arrays;
@@ -39,12 +38,9 @@ import org.apache.hadoop.streaming.io.Te
 import org.apache.hadoop.streaming.io.TextOutputReader;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Text;
 
-import org.apache.hadoop.fs.FileSystem;
-
 /** Shared functionality for PipeMapper, PipeReducer.
  */
 public abstract class PipeMapRed {
@@ -155,7 +151,6 @@ public abstract class PipeMapRed {
       joinDelay_ = job.getLong("stream.joindelay.milli", 0);
 
       job_ = job;
-      fs_ = FileSystem.get(job_);
       
       mapInputWriterClass_ = 
         job_.getClass("stream.map.input.writer.class", 
@@ -201,7 +196,7 @@ public abstract class PipeMapRed {
         }
         f = null;
       }
-      logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
+      LOG.info("PipeMapRed exec " + Arrays.asList(argvSplit));
       Environment childEnv = (Environment) StreamUtil.env().clone();
       addJobConfToEnvironment(job_, childEnv);
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
@@ -223,52 +218,25 @@ public abstract class PipeMapRed {
       startTime_ = System.currentTimeMillis();
 
     } catch (IOException e) {
-      logStackTrace(e);
       LOG.error("configuration exception", e);
       throw new RuntimeException("configuration exception", e);
     } catch (InterruptedException e)  {
-        logStackTrace(e);
-        LOG.error("configuration exception", e);
-        throw new RuntimeException("configuration exception", e);
-      }
+      LOG.error("configuration exception", e);
+      throw new RuntimeException("configuration exception", e);
+    }
   }
   
   void setStreamJobDetails(JobConf job) {
-    jobLog_ = job.get("stream.jobLog_");
     String s = job.get("stream.minRecWrittenToEnableSkip_");
     if (s != null) {
       minRecWrittenToEnableSkip_ = Long.parseLong(s);
-      logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
+      LOG.info("JobConf set minRecWrittenToEnableSkip_ ="
+          + minRecWrittenToEnableSkip_);
     }
     taskId_ = StreamUtil.getTaskInfo(job_);
   }
 
-  void logStackTrace(Exception e) {
-    if (e == null) return;
-    e.printStackTrace();
-    if (log_ != null) {
-      e.printStackTrace(log_);
-    }
-  }
-
-  void logprintln(String s) {
-    if (log_ != null) {
-      log_.println(s);
-    } else {
-      LOG.info(s); // or LOG.info()
-    }
-  }
-
-  void logflush() {
-    if (log_ != null) {
-      log_.flush();
-    }
-  }
-
   void addJobConfToEnvironment(JobConf conf, Properties env) {
-    if (debug_) {
-      logprintln("addJobConfToEnvironment: begin");
-    }
     Iterator it = conf.iterator();
     while (it.hasNext()) {
       Map.Entry en = (Map.Entry) it.next();
@@ -278,9 +246,6 @@ public abstract class PipeMapRed {
       name = safeEnvVarName(name);
       envPut(env, name, value);
     }
-    if (debug_) {
-      logprintln("addJobConfToEnvironment: end");
-    }
   }
 
   String safeEnvVarName(String var) {
@@ -306,7 +271,7 @@ public abstract class PipeMapRed {
     for (int i = 0; i < nv.length; i++) {
       String[] pair = nv[i].split("=", 2);
       if (pair.length != 2) {
-        logprintln("Skip ev entry:" + nv[i]);
+        LOG.info("Skip env entry:" + nv[i]);
       } else {
         envPut(env, pair[0], pair[1]);
       }
@@ -314,22 +279,12 @@ public abstract class PipeMapRed {
   }
 
   void envPut(Properties env, String name, String value) {
-    if (debug_) {
-      logprintln("Add  ev entry:" + name + "=" + value);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Add  env entry:" + name + "=" + value);
     }
     env.put(name, value);
   }
 
-  /** .. and if successful: delete the task log */
-  void appendLogToJobLog(String status) {
-    if (jobLog_ == null) {
-      return; // not using a common joblog
-    }
-    if (log_ != null) {
-      StreamUtil.exec("/bin/rm " + LOGNAME, log_);
-    }
-  }
-
   void startOutputThreads(OutputCollector output, Reporter reporter) 
     throws IOException {
     inWriter_ = createInputWriter();
@@ -366,8 +321,8 @@ public abstract class PipeMapRed {
           throw new RuntimeException("PipeMapRed.waitOutputThreads(): subprocess failed with code "
                                      + exitVal);
         } else {
-          logprintln("PipeMapRed.waitOutputThreads(): subprocess exited with code " + exitVal
-                     + " in " + PipeMapRed.class.getName());
+          LOG.info("PipeMapRed.waitOutputThreads(): subprocess exited with " +
+          		"code " + exitVal + " in " + PipeMapRed.class.getName());
         }
       }
       if (outThread_ != null) {
@@ -433,13 +388,12 @@ public abstract class PipeMapRed {
             } else {
               reporter.progress();
             }
-            logprintln(hline);
-            logflush();
+            LOG.info(hline);
           }
         }
       } catch (Throwable th) {
         outerrThreadsThrowable = th;
-        LOG.warn(StringUtils.stringifyException(th));
+        LOG.warn(th);
       } finally {
         try {
           if (clientIn_ != null) {
@@ -447,7 +401,7 @@ public abstract class PipeMapRed {
             clientIn_ = null;
           }
         } catch (IOException io) {
-          LOG.info(StringUtils.stringifyException(io));
+          LOG.info(io);
         }
       }
     }
@@ -508,7 +462,7 @@ public abstract class PipeMapRed {
         }
       } catch (Throwable th) {
         outerrThreadsThrowable = th;
-        LOG.warn(StringUtils.stringifyException(th));
+        LOG.warn(th);
         try {
           if (lineReader != null) {
             lineReader.close();
@@ -518,7 +472,7 @@ public abstract class PipeMapRed {
             clientErr_ = null;
           }
         } catch (IOException io) {
-          LOG.info(StringUtils.stringifyException(io));
+          LOG.info(io);
         }
       }
     }
@@ -565,7 +519,7 @@ public abstract class PipeMapRed {
   public void mapRedFinished() {
     try {
       if (!doPipe_) {
-        logprintln("mapRedFinished");
+        LOG.info("mapRedFinished");
         return;
       }
       try {
@@ -575,25 +529,20 @@ public abstract class PipeMapRed {
         }
         waitOutputThreads();
       } catch (IOException io) {
-        LOG.warn(StringUtils.stringifyException(io));
+        LOG.warn(io);
       }
       if (sim != null) sim.destroy();
-      logprintln("mapRedFinished");
+      LOG.info("mapRedFinished");
     } catch (RuntimeException e) {
-      logprintln("PipeMapRed failed!");
-      logStackTrace(e);
+      LOG.info("PipeMapRed failed!", e);
       throw e;
     }
-    if (debugFailLate_) {
-      throw new RuntimeException("debugFailLate_");
-    }
   }
 
   void maybeLogRecord() {
     if (numRecRead_ >= nextRecReadLog_) {
       String info = numRecInfo();
-      logprintln(info);
-      logflush();
+      LOG.info(info);
       if (nextRecReadLog_ < 100000) {
 	  nextRecReadLog_ *= 10;
       } else {
@@ -606,18 +555,12 @@ public abstract class PipeMapRed {
 
     String s = numRecInfo() + "\n";
     s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
-    s += "LOGNAME=" + LOGNAME + "\n";
     s += envline("HOST");
     s += envline("USER");
     s += envline("HADOOP_USER");
-    //s += envline("PWD"); // =/home/crawler/hadoop/trunk
-    s += "last Hadoop input: |" + mapredKey_ + "|\n";
     if (outThread_ != null) {
       s += "last tool output: |" + outReader_.getLastOutput() + "|\n";
     }
-    s += "Date: " + new Date() + "\n";
-    // s += envline("HADOOP_HOME");
-    // s += envline("REMOTE_HOST");
     return s;
   }
 
@@ -636,15 +579,6 @@ public abstract class PipeMapRed {
     return (d == 0) ? "NA" : "" + n / d + "=" + n + "/" + d;
   }
 
-  String logFailure(Exception e) {
-    StringWriter sw = new StringWriter();
-    PrintWriter pw = new PrintWriter(sw);
-    e.printStackTrace(pw);
-    String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
-    logprintln(msg);
-    return msg;
-  }
-
   long startTime_;
   long numRecRead_ = 0;
   long numRecWritten_ = 0;
@@ -657,13 +591,8 @@ public abstract class PipeMapRed {
   long reporterErrDelay_ = 10*1000L; 
   long joinDelay_;
   JobConf job_;
-  FileSystem fs_;
 
   boolean doPipe_;
-  boolean debug_;
-  boolean debugFailEarly_;
-  boolean debugFailDuring_;
-  boolean debugFailLate_;
 
   Class<? extends InputWriter> mapInputWriterClass_;
   Class<? extends OutputReader> mapOutputReaderClass_;
@@ -675,21 +604,16 @@ public abstract class PipeMapRed {
   InputWriter inWriter_;
   OutputReader outReader_;
   MROutputThread outThread_;
-  String jobLog_;
   MRErrorThread errThread_;
   DataOutputStream clientOut_;
   DataInputStream clientErr_;
   DataInputStream clientIn_;
 
   // set in PipeMapper/PipeReducer subclasses
-  String mapredKey_;
   int numExceptions_;
   StreamUtil.TaskId taskId_;
 
   protected volatile Throwable outerrThreadsThrowable;
 
-  String LOGNAME;
-  PrintStream log_;
-
   volatile boolean processProvidedStatus_ = false;
 }

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=959867&r1=959866&r2=959867&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Fri Jul  2 06:20:26 2010
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.streaming.io.InputWriter;
 import org.apache.hadoop.streaming.io.OutputReader;
 import org.apache.hadoop.streaming.io.TextInputWriter;
-import org.apache.hadoop.util.StringUtils;
 
 /** A generic Mapper bridge.
  *  It delegates operations to an external program via stdin and stdout.
@@ -91,17 +90,13 @@ public class PipeMapper extends PipeMapR
   public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
     if (outerrThreadsThrowable != null) {
       mapRedFinished();
-      throw new IOException ("MROutput/MRErrThread failed:"
-                             + StringUtils.stringifyException(
-                                                              outerrThreadsThrowable));
+      throw new IOException("MROutput/MRErrThread failed:",
+          outerrThreadsThrowable);
     }
     try {
       // 1/4 Hadoop in
       numRecRead_++;
       maybeLogRecord();
-      if (debugFailDuring_ && numRecRead_ == 3) {
-        throw new IOException("debugFailDuring_");
-      }
 
       // 2/4 Hadoop to Tool
       if (numExceptions_ == 0) {
@@ -121,10 +116,9 @@ public class PipeMapper extends PipeMapR
       numExceptions_++;
       if (numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
         // terminate with failure
-        String msg = logFailure(io);
-        appendLogToJobLog("failure");
+        LOG.info(getContext() , io);
         mapRedFinished();
-        throw new IOException(msg);
+        throw io;
       } else {
         // terminate with success:
         // swallow input records although the stream processor failed/closed
@@ -133,7 +127,6 @@ public class PipeMapper extends PipeMapR
   }
 
   public void close() {
-    appendLogToJobLog("success");
     mapRedFinished();
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=959867&r1=959866&r2=959867&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Fri Jul  2 06:20:26 2010
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.SkipBadR
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.streaming.io.InputWriter;
 import org.apache.hadoop.streaming.io.OutputReader;
-import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Writable;
 
@@ -96,9 +95,8 @@ public class PipeReducer extends PipeMap
         if (doPipe_) {
           if (outerrThreadsThrowable != null) {
             mapRedFinished();
-            throw new IOException ("MROutput/MRErrThread failed:"
-                                   + StringUtils.stringifyException(
-                                                                    outerrThreadsThrowable));
+            throw new IOException("MROutput/MRErrThread failed:",
+                outerrThreadsThrowable);
           }
           inWriter_.writeKey(key);
           inWriter_.writeValue(val);
@@ -127,14 +125,12 @@ public class PipeReducer extends PipeMap
         // hmm, but child is still running.  go figure.
 	extraInfo = "subprocess still running\n";
       };
-      appendLogToJobLog("failure");
       mapRedFinished();
       throw new IOException(extraInfo + getContext() + io.getMessage());
     }
   }
 
   public void close() {
-    appendLogToJobLog("success");
     mapRedFinished();
   }