You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/07/02 08:20:26 UTC
svn commit: r959867 - in /hadoop/mapreduce/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
Author: amareshwari
Date: Fri Jul 2 06:20:26 2010
New Revision: 959867
URL: http://svn.apache.org/viewvc?rev=959867&view=rev
Log:
MAPREDUCE-1864. Removes uninitialized/unused variables in org.apache.hadoop.streaming.PipeMapRed. Contributed by Amareshwari Sriramadasu
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=959867&r1=959866&r2=959867&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 2 06:20:26 2010
@@ -134,6 +134,9 @@ Trunk (unreleased changes)
MAPREDUCE-1863. Fix NPE in Rumen when processing null CDF for failed task
attempts. (Amar Kamat via cdouglas)
+ MAPREDUCE-1864. Removes uninitialized/unused variables in
+ org.apache.hadoop.streaming.PipeMapRed. (amareshwari)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=959867&r1=959866&r2=959867&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Jul 2 06:20:26 2010
@@ -19,7 +19,6 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.util.Date;
import java.util.Map;
import java.util.Iterator;
import java.util.Arrays;
@@ -39,12 +38,9 @@ import org.apache.hadoop.streaming.io.Te
import org.apache.hadoop.streaming.io.TextOutputReader;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
-
/** Shared functionality for PipeMapper, PipeReducer.
*/
public abstract class PipeMapRed {
@@ -155,7 +151,6 @@ public abstract class PipeMapRed {
joinDelay_ = job.getLong("stream.joindelay.milli", 0);
job_ = job;
- fs_ = FileSystem.get(job_);
mapInputWriterClass_ =
job_.getClass("stream.map.input.writer.class",
@@ -201,7 +196,7 @@ public abstract class PipeMapRed {
}
f = null;
}
- logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
+ LOG.info("PipeMapRed exec " + Arrays.asList(argvSplit));
Environment childEnv = (Environment) StreamUtil.env().clone();
addJobConfToEnvironment(job_, childEnv);
addEnvironment(childEnv, job_.get("stream.addenvironment"));
@@ -223,52 +218,25 @@ public abstract class PipeMapRed {
startTime_ = System.currentTimeMillis();
} catch (IOException e) {
- logStackTrace(e);
LOG.error("configuration exception", e);
throw new RuntimeException("configuration exception", e);
} catch (InterruptedException e) {
- logStackTrace(e);
- LOG.error("configuration exception", e);
- throw new RuntimeException("configuration exception", e);
- }
+ LOG.error("configuration exception", e);
+ throw new RuntimeException("configuration exception", e);
+ }
}
void setStreamJobDetails(JobConf job) {
- jobLog_ = job.get("stream.jobLog_");
String s = job.get("stream.minRecWrittenToEnableSkip_");
if (s != null) {
minRecWrittenToEnableSkip_ = Long.parseLong(s);
- logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
+ LOG.info("JobConf set minRecWrittenToEnableSkip_ ="
+ + minRecWrittenToEnableSkip_);
}
taskId_ = StreamUtil.getTaskInfo(job_);
}
- void logStackTrace(Exception e) {
- if (e == null) return;
- e.printStackTrace();
- if (log_ != null) {
- e.printStackTrace(log_);
- }
- }
-
- void logprintln(String s) {
- if (log_ != null) {
- log_.println(s);
- } else {
- LOG.info(s); // or LOG.info()
- }
- }
-
- void logflush() {
- if (log_ != null) {
- log_.flush();
- }
- }
-
void addJobConfToEnvironment(JobConf conf, Properties env) {
- if (debug_) {
- logprintln("addJobConfToEnvironment: begin");
- }
Iterator it = conf.iterator();
while (it.hasNext()) {
Map.Entry en = (Map.Entry) it.next();
@@ -278,9 +246,6 @@ public abstract class PipeMapRed {
name = safeEnvVarName(name);
envPut(env, name, value);
}
- if (debug_) {
- logprintln("addJobConfToEnvironment: end");
- }
}
String safeEnvVarName(String var) {
@@ -306,7 +271,7 @@ public abstract class PipeMapRed {
for (int i = 0; i < nv.length; i++) {
String[] pair = nv[i].split("=", 2);
if (pair.length != 2) {
- logprintln("Skip ev entry:" + nv[i]);
+ LOG.info("Skip env entry:" + nv[i]);
} else {
envPut(env, pair[0], pair[1]);
}
@@ -314,22 +279,12 @@ public abstract class PipeMapRed {
}
void envPut(Properties env, String name, String value) {
- if (debug_) {
- logprintln("Add ev entry:" + name + "=" + value);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add env entry:" + name + "=" + value);
}
env.put(name, value);
}
- /** .. and if successful: delete the task log */
- void appendLogToJobLog(String status) {
- if (jobLog_ == null) {
- return; // not using a common joblog
- }
- if (log_ != null) {
- StreamUtil.exec("/bin/rm " + LOGNAME, log_);
- }
- }
-
void startOutputThreads(OutputCollector output, Reporter reporter)
throws IOException {
inWriter_ = createInputWriter();
@@ -366,8 +321,8 @@ public abstract class PipeMapRed {
throw new RuntimeException("PipeMapRed.waitOutputThreads(): subprocess failed with code "
+ exitVal);
} else {
- logprintln("PipeMapRed.waitOutputThreads(): subprocess exited with code " + exitVal
- + " in " + PipeMapRed.class.getName());
+ LOG.info("PipeMapRed.waitOutputThreads(): subprocess exited with " +
+ "code " + exitVal + " in " + PipeMapRed.class.getName());
}
}
if (outThread_ != null) {
@@ -433,13 +388,12 @@ public abstract class PipeMapRed {
} else {
reporter.progress();
}
- logprintln(hline);
- logflush();
+ LOG.info(hline);
}
}
} catch (Throwable th) {
outerrThreadsThrowable = th;
- LOG.warn(StringUtils.stringifyException(th));
+ LOG.warn(th);
} finally {
try {
if (clientIn_ != null) {
@@ -447,7 +401,7 @@ public abstract class PipeMapRed {
clientIn_ = null;
}
} catch (IOException io) {
- LOG.info(StringUtils.stringifyException(io));
+ LOG.info(io);
}
}
}
@@ -508,7 +462,7 @@ public abstract class PipeMapRed {
}
} catch (Throwable th) {
outerrThreadsThrowable = th;
- LOG.warn(StringUtils.stringifyException(th));
+ LOG.warn(th);
try {
if (lineReader != null) {
lineReader.close();
@@ -518,7 +472,7 @@ public abstract class PipeMapRed {
clientErr_ = null;
}
} catch (IOException io) {
- LOG.info(StringUtils.stringifyException(io));
+ LOG.info(io);
}
}
}
@@ -565,7 +519,7 @@ public abstract class PipeMapRed {
public void mapRedFinished() {
try {
if (!doPipe_) {
- logprintln("mapRedFinished");
+ LOG.info("mapRedFinished");
return;
}
try {
@@ -575,25 +529,20 @@ public abstract class PipeMapRed {
}
waitOutputThreads();
} catch (IOException io) {
- LOG.warn(StringUtils.stringifyException(io));
+ LOG.warn(io);
}
if (sim != null) sim.destroy();
- logprintln("mapRedFinished");
+ LOG.info("mapRedFinished");
} catch (RuntimeException e) {
- logprintln("PipeMapRed failed!");
- logStackTrace(e);
+ LOG.info("PipeMapRed failed!", e);
throw e;
}
- if (debugFailLate_) {
- throw new RuntimeException("debugFailLate_");
- }
}
void maybeLogRecord() {
if (numRecRead_ >= nextRecReadLog_) {
String info = numRecInfo();
- logprintln(info);
- logflush();
+ LOG.info(info);
if (nextRecReadLog_ < 100000) {
nextRecReadLog_ *= 10;
} else {
@@ -606,18 +555,12 @@ public abstract class PipeMapRed {
String s = numRecInfo() + "\n";
s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
- s += "LOGNAME=" + LOGNAME + "\n";
s += envline("HOST");
s += envline("USER");
s += envline("HADOOP_USER");
- //s += envline("PWD"); // =/home/crawler/hadoop/trunk
- s += "last Hadoop input: |" + mapredKey_ + "|\n";
if (outThread_ != null) {
s += "last tool output: |" + outReader_.getLastOutput() + "|\n";
}
- s += "Date: " + new Date() + "\n";
- // s += envline("HADOOP_HOME");
- // s += envline("REMOTE_HOST");
return s;
}
@@ -636,15 +579,6 @@ public abstract class PipeMapRed {
return (d == 0) ? "NA" : "" + n / d + "=" + n + "/" + d;
}
- String logFailure(Exception e) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
- logprintln(msg);
- return msg;
- }
-
long startTime_;
long numRecRead_ = 0;
long numRecWritten_ = 0;
@@ -657,13 +591,8 @@ public abstract class PipeMapRed {
long reporterErrDelay_ = 10*1000L;
long joinDelay_;
JobConf job_;
- FileSystem fs_;
boolean doPipe_;
- boolean debug_;
- boolean debugFailEarly_;
- boolean debugFailDuring_;
- boolean debugFailLate_;
Class<? extends InputWriter> mapInputWriterClass_;
Class<? extends OutputReader> mapOutputReaderClass_;
@@ -675,21 +604,16 @@ public abstract class PipeMapRed {
InputWriter inWriter_;
OutputReader outReader_;
MROutputThread outThread_;
- String jobLog_;
MRErrorThread errThread_;
DataOutputStream clientOut_;
DataInputStream clientErr_;
DataInputStream clientIn_;
// set in PipeMapper/PipeReducer subclasses
- String mapredKey_;
int numExceptions_;
StreamUtil.TaskId taskId_;
protected volatile Throwable outerrThreadsThrowable;
- String LOGNAME;
- PrintStream log_;
-
volatile boolean processProvidedStatus_ = false;
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=959867&r1=959866&r2=959867&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Fri Jul 2 06:20:26 2010
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.streaming.io.InputWriter;
import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.streaming.io.TextInputWriter;
-import org.apache.hadoop.util.StringUtils;
/** A generic Mapper bridge.
* It delegates operations to an external program via stdin and stdout.
@@ -91,17 +90,13 @@ public class PipeMapper extends PipeMapR
public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
if (outerrThreadsThrowable != null) {
mapRedFinished();
- throw new IOException ("MROutput/MRErrThread failed:"
- + StringUtils.stringifyException(
- outerrThreadsThrowable));
+ throw new IOException("MROutput/MRErrThread failed:",
+ outerrThreadsThrowable);
}
try {
// 1/4 Hadoop in
numRecRead_++;
maybeLogRecord();
- if (debugFailDuring_ && numRecRead_ == 3) {
- throw new IOException("debugFailDuring_");
- }
// 2/4 Hadoop to Tool
if (numExceptions_ == 0) {
@@ -121,10 +116,9 @@ public class PipeMapper extends PipeMapR
numExceptions_++;
if (numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
// terminate with failure
- String msg = logFailure(io);
- appendLogToJobLog("failure");
+ LOG.info(getContext() , io);
mapRedFinished();
- throw new IOException(msg);
+ throw io;
} else {
// terminate with success:
// swallow input records although the stream processor failed/closed
@@ -133,7 +127,6 @@ public class PipeMapper extends PipeMapR
}
public void close() {
- appendLogToJobLog("success");
mapRedFinished();
}
Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=959867&r1=959866&r2=959867&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Fri Jul 2 06:20:26 2010
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.SkipBadR
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.streaming.io.InputWriter;
import org.apache.hadoop.streaming.io.OutputReader;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.io.Writable;
@@ -96,9 +95,8 @@ public class PipeReducer extends PipeMap
if (doPipe_) {
if (outerrThreadsThrowable != null) {
mapRedFinished();
- throw new IOException ("MROutput/MRErrThread failed:"
- + StringUtils.stringifyException(
- outerrThreadsThrowable));
+ throw new IOException("MROutput/MRErrThread failed:",
+ outerrThreadsThrowable);
}
inWriter_.writeKey(key);
inWriter_.writeValue(val);
@@ -127,14 +125,12 @@ public class PipeReducer extends PipeMap
// hmm, but child is still running. go figure.
extraInfo = "subprocess still running\n";
};
- appendLogToJobLog("failure");
mapRedFinished();
throw new IOException(extraInfo + getContext() + io.getMessage());
}
}
public void close() {
- appendLogToJobLog("success");
mapRedFinished();
}