You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2016/06/29 15:10:40 UTC
[1/2] nutch git commit: NUTCH-2262 Utilize parameterized logging
notation across Fetcher
Repository: nutch
Updated Branches:
refs/heads/master abc01175d -> d96c936b6
NUTCH-2262 Utilize parameterized logging notation across Fetcher
Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/df5eed2e
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/df5eed2e
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/df5eed2e
Branch: refs/heads/master
Commit: df5eed2e1293f8529057a114edafe4c60f45201a
Parents: 9565389
Author: Lewis John McGibbney <le...@gmail.com>
Authored: Tue May 17 09:38:01 2016 -0700
Committer: Lewis John McGibbney <le...@gmail.com>
Committed: Tue May 17 09:38:01 2016 -0700
----------------------------------------------------------------------
.../org/apache/nutch/fetcher/FetchItem.java | 2 +-
src/java/org/apache/nutch/fetcher/Fetcher.java | 82 +++++++++-----------
.../nutch/fetcher/FetcherOutputFormat.java | 4 +-
3 files changed, 39 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nutch/blob/df5eed2e/src/java/org/apache/nutch/fetcher/FetchItem.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/fetcher/FetchItem.java b/src/java/org/apache/nutch/fetcher/FetchItem.java
index 833fa38..3ad4970 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItem.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItem.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
/**
- * This class described the item to be fetched.
+ * This class describes the item to be fetched.
*/
public class FetchItem {
http://git-wip-us.apache.org/repos/asf/nutch/blob/df5eed2e/src/java/org/apache/nutch/fetcher/Fetcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java b/src/java/org/apache/nutch/fetcher/Fetcher.java
index 2030778..aad9ee9 100644
--- a/src/java/org/apache/nutch/fetcher/Fetcher.java
+++ b/src/java/org/apache/nutch/fetcher/Fetcher.java
@@ -78,7 +78,7 @@ import org.apache.nutch.util.*;
* @author Andrzej Bialecki
*/
public class Fetcher extends NutchTool implements Tool,
- MapRunnable<Text, CrawlDatum, Text, NutchWritable> {
+MapRunnable<Text, CrawlDatum, Text, NutchWritable> {
public static final int PERM_REFRESH_TIME = 5;
@@ -89,7 +89,7 @@ public class Fetcher extends NutchTool implements Tool,
public static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
public static class InputFormat extends
- SequenceFileInputFormat<Text, CrawlDatum> {
+ SequenceFileInputFormat<Text, CrawlDatum> {
/** Don't split inputs, to keep things polite. */
public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException {
FileStatus[] files = listStatus(job);
@@ -103,6 +103,7 @@ public class Fetcher extends NutchTool implements Tool,
}
}
+ @SuppressWarnings("unused")
private OutputCollector<Text, NutchWritable> output;
private Reporter reporter;
@@ -125,7 +126,7 @@ public class Fetcher extends NutchTool implements Tool,
LinkedList<FetcherThread> fetcherThreads = new LinkedList<FetcherThread>();
public Fetcher() {
- super(null);
+ super(null);
}
public Fetcher(Configuration conf) {
@@ -141,14 +142,14 @@ public class Fetcher extends NutchTool implements Tool,
long avgBytesSec = (bytes.get() / 128l) / elapsed.longValue();
status.append(activeThreads).append(" threads (").append(spinWaiting.get())
- .append(" waiting), ");
+ .append(" waiting), ");
status.append(fetchQueues.getQueueCount()).append(" queues, ");
status.append(fetchQueues.getTotalSize()).append(" URLs queued, ");
status.append(pages).append(" pages, ").append(errors).append(" errors, ");
status.append(String.format("%.2f", avgPagesSec)).append(" pages/s (");
status.append(pagesLastSec).append(" last sec), ");
status.append(avgBytesSec).append(" kbits/s (")
- .append((bytesLastSec / 128)).append(" last sec)");
+ .append((bytesLastSec / 128)).append(" last sec)");
reporter.setStatus(status.toString());
}
@@ -178,7 +179,7 @@ public class Fetcher extends NutchTool implements Tool,
public void run(RecordReader<Text, CrawlDatum> input,
OutputCollector<Text, NutchWritable> output, Reporter reporter)
- throws IOException {
+ throws IOException {
this.output = output;
this.reporter = reporter;
@@ -186,12 +187,12 @@ public class Fetcher extends NutchTool implements Tool,
int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
if (LOG.isInfoEnabled()) {
- LOG.info("Fetcher: threads: " + threadCount);
+ LOG.info("Fetcher: threads: {}", threadCount);
}
int timeoutDivisor = getConf().getInt("fetcher.threads.timeout.divisor", 2);
if (LOG.isInfoEnabled()) {
- LOG.info("Fetcher: time-out divisor: " + timeoutDivisor);
+ LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor);
}
int queueDepthMuliplier = getConf().getInt(
@@ -229,20 +230,18 @@ public class Fetcher extends NutchTool implements Tool,
int pagesLastSec;
int bytesLastSec;
- // Set to true whenever the threshold has been exceeded for the first time
- boolean throughputThresholdExceeded = false;
int throughputThresholdNumRetries = 0;
int throughputThresholdPages = getConf().getInt(
"fetcher.throughput.threshold.pages", -1);
if (LOG.isInfoEnabled()) {
- LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages);
+ LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages);
}
int throughputThresholdMaxRetries = getConf().getInt(
"fetcher.throughput.threshold.retries", 5);
if (LOG.isInfoEnabled()) {
- LOG.info("Fetcher: throughput threshold retries: "
- + throughputThresholdMaxRetries);
+ LOG.info("Fetcher: throughput threshold retries: {}",
+ throughputThresholdMaxRetries);
}
long throughputThresholdTimeLimit = getConf().getLong(
"fetcher.throughput.threshold.check.after", -1);
@@ -250,8 +249,8 @@ public class Fetcher extends NutchTool implements Tool,
int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000;
int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount);
if (maxNumThreads < threadCount) {
- LOG.info("fetcher.maxNum.threads can't be < than " + threadCount
- + " : using " + threadCount + " instead");
+ LOG.info("fetcher.maxNum.threads can't be < than {} : using {} instead",
+ threadCount, threadCount);
maxNumThreads = threadCount;
}
int bandwidthTargetCheckEveryNSecs = getConf().getInt(
@@ -297,10 +296,8 @@ public class Fetcher extends NutchTool implements Tool,
// Check if we're dropping below the threshold
if (pagesLastSec < throughputThresholdPages) {
throughputThresholdNumRetries++;
- LOG.warn(Integer.toString(throughputThresholdNumRetries)
- + ": dropping below configured threshold of "
- + Integer.toString(throughputThresholdPages)
- + " pages per second");
+ LOG.warn("{}: dropping below configured threshold of {} pages per second",
+ Integer.toString(throughputThresholdNumRetries), Integer.toString(throughputThresholdPages));
// Quit if we dropped below threshold too many times
if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
@@ -336,8 +333,7 @@ public class Fetcher extends NutchTool implements Tool,
averageBdwPerThread = Math.round(bpsSinceLastCheck
/ activeThreads.get());
- LOG.info("averageBdwPerThread : " + (averageBdwPerThread / 1000)
- + " kbps");
+ LOG.info("averageBdwPerThread : {} kbps", (averageBdwPerThread / 1000));
if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) {
// check whether it is worth doing e.g. more queues than threads
@@ -354,10 +350,8 @@ public class Fetcher extends NutchTool implements Tool,
// availableThreads and additionalThreads)
additionalThreads = (availableThreads < additionalThreads ? availableThreads
: additionalThreads);
- LOG.info("Has space for more threads ("
- + (bpsSinceLastCheck / 1000) + " vs "
- + (targetBandwidth / 1000) + " kbps) \t=> adding "
- + additionalThreads + " new threads");
+ LOG.info("Has space for more threads ({} vs {} kbps) \t=> adding {} new threads",
+ (bpsSinceLastCheck / 1000), (targetBandwidth / 1000), additionalThreads);
// activate new threads
for (int i = 0; i < additionalThreads; i++) {
FetcherThread thread = new FetcherThread(getConf(), getActiveThreads(), fetchQueues,
@@ -373,9 +367,8 @@ public class Fetcher extends NutchTool implements Tool,
// bandwidth, we have to stop some threads
long excessBdw = bpsSinceLastCheck - targetBandwidth;
int excessThreads = Math.round(excessBdw / averageBdwPerThread);
- LOG.info("Exceeding target bandwidth (" + bpsSinceLastCheck / 1000
- + " vs " + (targetBandwidth / 1000)
- + " kbps). \t=> excessThreads = " + excessThreads);
+ LOG.info("Exceeding target bandwidth ({} vs {} kbps). \t=> excessThreads = {}",
+ bpsSinceLastCheck / 1000, (targetBandwidth / 1000), excessThreads);
// keep at least one
if (excessThreads >= fetcherThreads.size())
excessThreads = 0;
@@ -399,12 +392,11 @@ public class Fetcher extends NutchTool implements Tool,
// some requests seem to hang, despite all intentions
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
if (LOG.isWarnEnabled()) {
- LOG.warn("Aborting with " + activeThreads + " hung threads.");
+ LOG.warn("Aborting with {} hung threads.", activeThreads);
for (int i = 0; i < fetcherThreads.size(); i++) {
FetcherThread thread = fetcherThreads.get(i);
if (thread.isAlive()) {
- LOG.warn("Thread #" + i + " hung while processing "
- + thread.getReprUrl());
+ LOG.warn("Thread #{} hung while processing {}", i, thread.getReprUrl());
if (LOG.isDebugEnabled()) {
StackTraceElement[] stack = thread.getStackTrace();
StringBuilder sb = new StringBuilder();
@@ -421,7 +413,7 @@ public class Fetcher extends NutchTool implements Tool,
}
} while (activeThreads.get() > 0);
- LOG.info("-activeThreads=" + activeThreads);
+ LOG.info("-activeThreads={}", activeThreads);
}
@@ -432,8 +424,8 @@ public class Fetcher extends NutchTool implements Tool,
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
if (LOG.isInfoEnabled()) {
- LOG.info("Fetcher: starting at " + sdf.format(start));
- LOG.info("Fetcher: segment: " + segment);
+ LOG.info("Fetcher: starting at {}", sdf.format(start));
+ LOG.info("Fetcher: segment: {}", segment);
}
// set the actual time for the timelimit relative
@@ -442,7 +434,7 @@ public class Fetcher extends NutchTool implements Tool,
long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
if (timelimit != -1) {
timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
- LOG.info("Fetcher Timelimit set for : " + timelimit);
+ LOG.info("Fetcher Timelimit set for : {}", timelimit);
getConf().setLong("fetcher.timelimit", timelimit);
}
@@ -455,8 +447,8 @@ public class Fetcher extends NutchTool implements Tool,
int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);
if (maxOutlinkDepth > 0) {
- LOG.info("Fetcher: following outlinks up to depth: "
- + Integer.toString(maxOutlinkDepth));
+ LOG.info("Fetcher: following outlinks up to depth: {}",
+ Integer.toString(maxOutlinkDepth));
int maxOutlinkDepthNumLinks = getConf().getInt(
"fetcher.follow.outlinks.num.links", 4);
@@ -469,8 +461,8 @@ public class Fetcher extends NutchTool implements Tool,
/ (i + 1) * maxOutlinkDepthNumLinks);
}
- LOG.info("Fetcher: maximum outlinks to follow: "
- + Integer.toString(totalOutlinksToFollow));
+ LOG.info("Fetcher: maximum outlinks to follow: {}",
+ Integer.toString(totalOutlinksToFollow));
}
JobConf job = new NutchJob(getConf());
@@ -496,8 +488,8 @@ public class Fetcher extends NutchTool implements Tool,
JobClient.runJob(job);
long end = System.currentTimeMillis();
- LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: "
- + TimingUtil.elapsedTime(start, end));
+ LOG.info("Fetcher: finished at {}, elapsed: {}", sdf.format(end),
+ TimingUtil.elapsedTime(start, end));
}
/** Run the fetcher. */
@@ -518,7 +510,6 @@ public class Fetcher extends NutchTool implements Tool,
Path segment = new Path(args[0]);
int threads = getConf().getInt("fetcher.threads.fetch", 10);
- boolean parsing = false;
for (int i = 1; i < args.length; i++) { // parse command line
if (args[i].equals("-threads")) { // found -threads option
@@ -532,7 +523,7 @@ public class Fetcher extends NutchTool implements Tool,
fetch(segment, threads);
return 0;
} catch (Exception e) {
- LOG.error("Fetcher: " + StringUtils.stringifyException(e));
+ LOG.error("Fetcher: {}", StringUtils.stringifyException(e));
return -1;
}
@@ -554,7 +545,7 @@ public class Fetcher extends NutchTool implements Tool,
private AtomicInteger getActiveThreads() {
return activeThreads;
}
-
+
@Override
public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
@@ -588,7 +579,6 @@ public class Fetcher extends NutchTool implements Tool,
int threads = getConf().getInt("fetcher.threads.fetch", 10);
- boolean parsing = false;
// parse command line
if (args.containsKey("threads")) { // found -threads option
@@ -601,7 +591,7 @@ public class Fetcher extends NutchTool implements Tool,
results.put(Nutch.VAL_RESULT, Integer.toString(0));
return results;
} catch (Exception e) {
- LOG.error("Fetcher: " + StringUtils.stringifyException(e));
+ LOG.error("Fetcher: {}", StringUtils.stringifyException(e));
results.put(Nutch.VAL_RESULT, Integer.toString(-1));
return results;
}
http://git-wip-us.apache.org/repos/asf/nutch/blob/df5eed2e/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java b/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
index 89969ee..d526a07 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
@@ -58,7 +58,7 @@ public class FetcherOutputFormat implements OutputFormat<Text, NutchWritable> {
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
final JobConf job, final String name, final Progressable progress)
- throws IOException {
+ throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
@@ -71,7 +71,7 @@ public class FetcherOutputFormat implements OutputFormat<Text, NutchWritable> {
org.apache.hadoop.io.SequenceFile.Writer.Option fValClassOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
org.apache.hadoop.io.SequenceFile.Writer.Option fProgressOpt = SequenceFile.Writer.progressable(progress);
org.apache.hadoop.io.SequenceFile.Writer.Option fCompOpt = SequenceFile.Writer.compression(compType);
-
+
final MapFile.Writer fetchOut = new MapFile.Writer(job,
fetch, fKeyClassOpt, fValClassOpt, fCompOpt, fProgressOpt);
[2/2] nutch git commit: Merge branch 'NUTCH-2262' of
https://github.com/lewismc/nutch this closes #112
Posted by le...@apache.org.
Merge branch 'NUTCH-2262' of https://github.com/lewismc/nutch this closes #112
Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/d96c936b
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/d96c936b
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/d96c936b
Branch: refs/heads/master
Commit: d96c936b678d4d733a3846c5f1b2f455e19ade0e
Parents: abc0117 df5eed2
Author: Lewis John McGibbney <le...@gmail.com>
Authored: Wed Jun 29 08:16:38 2016 -0700
Committer: Lewis John McGibbney <le...@gmail.com>
Committed: Wed Jun 29 08:16:38 2016 -0700
----------------------------------------------------------------------
.../org/apache/nutch/fetcher/FetchItem.java | 2 +-
src/java/org/apache/nutch/fetcher/Fetcher.java | 82 +++++++++-----------
.../nutch/fetcher/FetcherOutputFormat.java | 4 +-
3 files changed, 39 insertions(+), 49 deletions(-)
----------------------------------------------------------------------