You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2012/05/17 21:15:28 UTC
svn commit: r1339806 - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase:
client/ScannerCallable.java mapred/TableRecordReaderImpl.java
mapreduce/TableRecordReaderImpl.java
Author: apurtell
Date: Thu May 17 19:15:28 2012
New Revision: 1339806
URL: http://svn.apache.org/viewvc?rev=1339806&view=rev
Log:
HBASE-6004. Adding more logging to help debugging MR job (Jimmy Xiang)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1339806&r1=1339805&r2=1339806&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Thu May 17 19:15:28 2012
@@ -25,10 +25,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -48,6 +51,10 @@ import com.google.protobuf.ServiceExcept
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ScannerCallable extends ServerCallable<Result[]> {
+ public static final String LOG_SCANNER_LATENCY_CUTOFF
+ = "hbase.client.log.scanner.latency.cutoff";
+ public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
+
private static final Log LOG = LogFactory.getLog(ScannerCallable.class);
private long scannerId = -1L;
private boolean instantiated = false;
@@ -55,6 +62,8 @@ public class ScannerCallable extends Ser
private Scan scan;
private int caching = 1;
private ScanMetrics scanMetrics;
+ private boolean logScannerActivity = false;
+ private int logCutOffLatency = 1000;
// indicate if it is a remote server call
private boolean isRegionServerRemote = true;
@@ -71,6 +80,9 @@ public class ScannerCallable extends Ser
super(connection, tableName, scan.getStartRow());
this.scan = scan;
this.scanMetrics = scanMetrics;
+ Configuration conf = connection.getConfiguration();
+ logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
+ logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
}
/**
@@ -129,7 +141,16 @@ public class ScannerCallable extends Ser
RequestConverter.buildScanRequest(scannerId, caching, false);
try {
ScanResponse response = server.scan(null, request);
+ long timestamp = System.currentTimeMillis();
rrs = ResponseConverter.getResults(response);
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ if (now - timestamp > logCutOffLatency) {
+ int rows = rrs == null ? 0 : rrs.length;
+ LOG.info("Took " + (now-timestamp) + "ms to fetch "
+ + rows + " rows from scanner=" + scannerId);
+ }
+ }
if (response.hasMoreResults()
&& !response.getMoreResults()) {
scannerId = -1L;
@@ -141,10 +162,25 @@ public class ScannerCallable extends Ser
}
updateResultsMetrics(rrs);
} catch (IOException e) {
+ if (logScannerActivity) {
+ LOG.info("Got exception in fetching from scanner="
+ + scannerId, e);
+ }
IOException ioe = e;
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
+ if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
+ try {
+ HRegionLocation location =
+ connection.relocateRegion(tableName, scan.getStartRow());
+ LOG.info("Scanner=" + scannerId
+ + " expired, current region location is " + location.toString()
+ + " ip:" + location.getServerAddress().getBindAddress());
+ } catch (Throwable t) {
+ LOG.info("Failed to relocate region", t);
+ }
+ }
if (ioe instanceof NotServingRegionException) {
// Throw a DNRE so that we break out of cycle of calling NSRE
// when what we need is to open scanner against new location.
@@ -221,7 +257,13 @@ public class ScannerCallable extends Ser
this.scan, 0, false);
try {
ScanResponse response = server.scan(null, request);
- return response.getScannerId();
+ long id = response.getScannerId();
+ if (logScannerActivity) {
+ LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
+ + " on region " + this.location.toString() + " ip:"
+ + this.location.getServerAddress().getBindAddress());
+ }
+ return id;
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java?rev=1339806&r1=1339805&r2=1339806&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java Thu May 17 19:15:28 2012
@@ -23,11 +23,13 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
/**
* Iterate over an HBase table data, return (Text, RowResult) pairs
@@ -49,6 +52,10 @@ public class TableRecordReaderImpl {
private ResultScanner scanner;
private HTable htable;
private byte [][] trrInputColumns;
+ private long timestamp;
+ private int rowcount;
+ private boolean logScannerActivity = false;
+ private int logPerRowCount = 100;
/**
* Restart from survivable exceptions by creating a new scanner.
@@ -57,6 +64,7 @@ public class TableRecordReaderImpl {
* @throws IOException
*/
public void restart(byte[] firstRow) throws IOException {
+ Scan currentScan;
if ((endRow != null) && (endRow.length > 0)) {
if (trrRowFilter != null) {
Scan scan = new Scan(firstRow, endRow);
@@ -64,6 +72,7 @@ public class TableRecordReaderImpl {
scan.setFilter(trrRowFilter);
scan.setCacheBlocks(false);
this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
} else {
LOG.debug("TIFB.restart, firstRow: " +
Bytes.toStringBinary(firstRow) + ", endRow: " +
@@ -71,6 +80,7 @@ public class TableRecordReaderImpl {
Scan scan = new Scan(firstRow, endRow);
TableInputFormat.addColumns(scan, trrInputColumns);
this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
}
} else {
LOG.debug("TIFB.restart, firstRow: " +
@@ -80,6 +90,12 @@ public class TableRecordReaderImpl {
TableInputFormat.addColumns(scan, trrInputColumns);
scan.setFilter(trrRowFilter);
this.scanner = this.htable.getScanner(scan);
+ currentScan = scan;
+ }
+ if (logScannerActivity) {
+ LOG.info("Current scan=" + currentScan.toString());
+ timestamp = System.currentTimeMillis();
+ rowcount = 0;
}
}
@@ -99,6 +115,10 @@ public class TableRecordReaderImpl {
* @param htable the {@link HTable} to scan.
*/
public void setHTable(HTable htable) {
+ Configuration conf = htable.getConfiguration();
+ logScannerActivity = conf.getBoolean(
+ ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+ logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
this.htable = htable;
}
@@ -174,32 +194,55 @@ public class TableRecordReaderImpl {
throws IOException {
Result result;
try {
- result = this.scanner.next();
- } catch (DoNotRetryIOException e) {
- throw e;
- } catch (IOException e) {
- LOG.debug("recovered from " + StringUtils.stringifyException(e));
- if (lastSuccessfulRow == null) {
- LOG.warn("We are restarting the first next() invocation," +
- " if your mapper's restarted a few other times like this" +
- " then you should consider killing this job and investigate" +
- " why it's taking so long.");
- }
- if (lastSuccessfulRow == null) {
- restart(startRow);
- } else {
- restart(lastSuccessfulRow);
- this.scanner.next(); // skip presumed already mapped row
+ try {
+ result = this.scanner.next();
+ if (logScannerActivity) {
+ rowcount ++;
+ if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ LOG.info("Mapper took " + (now-timestamp)
+ + "ms to process " + rowcount + " rows");
+ timestamp = now;
+ rowcount = 0;
+ }
+ }
+ } catch (DoNotRetryIOException e) {
+ throw e;
+ } catch (IOException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ if (lastSuccessfulRow == null) {
+ LOG.warn("We are restarting the first next() invocation," +
+ " if your mapper has restarted a few other times like this" +
+ " then you should consider killing this job and investigate" +
+ " why it's taking so long.");
+ }
+ if (lastSuccessfulRow == null) {
+ restart(startRow);
+ } else {
+ restart(lastSuccessfulRow);
+ this.scanner.next(); // skip presumed already mapped row
+ }
+ result = this.scanner.next();
}
- result = this.scanner.next();
- }
- if (result != null && result.size() > 0) {
- key.set(result.getRow());
- lastSuccessfulRow = key.get();
- Writables.copyWritable(result, value);
- return true;
+ if (result != null && result.size() > 0) {
+ key.set(result.getRow());
+ lastSuccessfulRow = key.get();
+ Writables.copyWritable(result, value);
+ return true;
+ }
+ return false;
+ } catch (IOException ioe) {
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ LOG.info("Mapper took " + (now-timestamp)
+ + "ms to process " + rowcount + " rows");
+ LOG.info(ioe);
+ String lastRow = lastSuccessfulRow == null ?
+ "null" : Bytes.toStringBinary(lastSuccessfulRow);
+ LOG.info("lastSuccessfulRow=" + lastRow);
+ }
+ throw ioe;
}
- return false;
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java?rev=1339806&r1=1339805&r2=1339806&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java Thu May 17 19:15:28 2012
@@ -24,11 +24,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -46,7 +48,8 @@ import org.apache.hadoop.util.StringUtil
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TableRecordReaderImpl {
-
+ public static final String LOG_PER_ROW_COUNT
+ = "hbase.mapreduce.log.scanner.rowcount";
static final Log LOG = LogFactory.getLog(TableRecordReader.class);
@@ -62,6 +65,10 @@ public class TableRecordReaderImpl {
private Result value = null;
private TaskAttemptContext context = null;
private Method getCounter = null;
+ private long timestamp;
+ private int rowcount;
+ private boolean logScannerActivity = false;
+ private int logPerRowCount = 100;
/**
* Restart from survivable exceptions by creating a new scanner.
@@ -75,6 +82,11 @@ public class TableRecordReaderImpl {
currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
Bytes.toBytes(Boolean.TRUE));
this.scanner = this.htable.getScanner(currentScan);
+ if (logScannerActivity) {
+ LOG.info("Current scan=" + currentScan.toString());
+ timestamp = System.currentTimeMillis();
+ rowcount = 0;
+ }
}
/**
@@ -103,6 +115,10 @@ public class TableRecordReaderImpl {
* @param htable The {@link HTable} to scan.
*/
public void setHTable(HTable htable) {
+ Configuration conf = htable.getConfiguration();
+ logScannerActivity = conf.getBoolean(
+ ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+ logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
this.htable = htable;
}
@@ -174,33 +190,56 @@ public class TableRecordReaderImpl {
if (key == null) key = new ImmutableBytesWritable();
if (value == null) value = new Result();
try {
- value = this.scanner.next();
- } catch (DoNotRetryIOException e) {
- throw e;
- } catch (IOException e) {
- LOG.info("recovered from " + StringUtils.stringifyException(e));
- if (lastSuccessfulRow == null) {
- LOG.warn("We are restarting the first next() invocation," +
- " if your mapper's restarted a few other times like this" +
- " then you should consider killing this job and investigate" +
- " why it's taking so long.");
+ try {
+ value = this.scanner.next();
+ if (logScannerActivity) {
+ rowcount ++;
+ if (rowcount >= logPerRowCount) {
+ long now = System.currentTimeMillis();
+ LOG.info("Mapper took " + (now-timestamp)
+ + "ms to process " + rowcount + " rows");
+ timestamp = now;
+ rowcount = 0;
+ }
+ }
+ } catch (DoNotRetryIOException e) {
+ throw e;
+ } catch (IOException e) {
+ LOG.info("recovered from " + StringUtils.stringifyException(e));
+ if (lastSuccessfulRow == null) {
+ LOG.warn("We are restarting the first next() invocation," +
+ " if your mapper has restarted a few other times like this" +
+ " then you should consider killing this job and investigate" +
+ " why it's taking so long.");
+ }
+ if (lastSuccessfulRow == null) {
+ restart(scan.getStartRow());
+ } else {
+ restart(lastSuccessfulRow);
+ scanner.next(); // skip presumed already mapped row
+ }
+ value = scanner.next();
}
- if (lastSuccessfulRow == null) {
- restart(scan.getStartRow());
- } else {
- restart(lastSuccessfulRow);
- scanner.next(); // skip presumed already mapped row
+ if (value != null && value.size() > 0) {
+ key.set(value.getRow());
+ lastSuccessfulRow = key.get();
+ return true;
}
- value = scanner.next();
- }
- if (value != null && value.size() > 0) {
- key.set(value.getRow());
- lastSuccessfulRow = key.get();
- return true;
- }
- updateCounters();
- return false;
+ updateCounters();
+ return false;
+ } catch (IOException ioe) {
+ if (logScannerActivity) {
+ long now = System.currentTimeMillis();
+ LOG.info("Mapper took " + (now-timestamp)
+ + "ms to process " + rowcount + " rows");
+ LOG.info(ioe);
+ String lastRow = lastSuccessfulRow == null ?
+ "null" : Bytes.toStringBinary(lastSuccessfulRow);
+ LOG.info("lastSuccessfulRow=" + lastRow);
+ }
+ throw ioe;
+ }
}
/**