You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2012/05/17 21:15:28 UTC

svn commit: r1339806 - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase: client/ScannerCallable.java mapred/TableRecordReaderImpl.java mapreduce/TableRecordReaderImpl.java

Author: apurtell
Date: Thu May 17 19:15:28 2012
New Revision: 1339806

URL: http://svn.apache.org/viewvc?rev=1339806&view=rev
Log:
HBASE-6004. Adding more logging to help debugging MR job (Jimmy Xiang)

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1339806&r1=1339805&r2=1339806&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Thu May 17 19:15:28 2012
@@ -25,10 +25,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -48,6 +51,10 @@ import com.google.protobuf.ServiceExcept
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class ScannerCallable extends ServerCallable<Result[]> {
+  public static final String LOG_SCANNER_LATENCY_CUTOFF
+    = "hbase.client.log.scanner.latency.cutoff";
+  public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
+
   private static final Log LOG = LogFactory.getLog(ScannerCallable.class);
   private long scannerId = -1L;
   private boolean instantiated = false;
@@ -55,6 +62,8 @@ public class ScannerCallable extends Ser
   private Scan scan;
   private int caching = 1;
   private ScanMetrics scanMetrics;
+  private boolean logScannerActivity = false;
+  private int logCutOffLatency = 1000;
 
   // indicate if it is a remote server call
   private boolean isRegionServerRemote = true;
@@ -71,6 +80,9 @@ public class ScannerCallable extends Ser
     super(connection, tableName, scan.getStartRow());
     this.scan = scan;
     this.scanMetrics = scanMetrics;
+    Configuration conf = connection.getConfiguration();
+    logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
+    logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
   }
 
   /**
@@ -129,7 +141,16 @@ public class ScannerCallable extends Ser
             RequestConverter.buildScanRequest(scannerId, caching, false);
           try {
             ScanResponse response = server.scan(null, request);
+            long timestamp = System.currentTimeMillis();
             rrs = ResponseConverter.getResults(response);
+            if (logScannerActivity) {
+              long now = System.currentTimeMillis();
+              if (now - timestamp > logCutOffLatency) {
+                int rows = rrs == null ? 0 : rrs.length;
+                LOG.info("Took " + (now-timestamp) + "ms to fetch "
+                  + rows + " rows from scanner=" + scannerId);
+              }
+            }
             if (response.hasMoreResults()
                 && !response.getMoreResults()) {
               scannerId = -1L;
@@ -141,10 +162,25 @@ public class ScannerCallable extends Ser
           }
           updateResultsMetrics(rrs);
         } catch (IOException e) {
+          if (logScannerActivity) {
+            LOG.info("Got exception in fetching from scanner="
+              + scannerId, e);
+          }
           IOException ioe = e;
           if (e instanceof RemoteException) {
             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
           }
+          if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
+            try {
+              HRegionLocation location =
+                connection.relocateRegion(tableName, scan.getStartRow());
+              LOG.info("Scanner=" + scannerId
+                + " expired, current region location is " + location.toString()
+                + " ip:" + location.getServerAddress().getBindAddress());
+            } catch (Throwable t) {
+              LOG.info("Failed to relocate region", t);
+            }
+          }
           if (ioe instanceof NotServingRegionException) {
             // Throw a DNRE so that we break out of cycle of calling NSRE
             // when what we need is to open scanner against new location.
@@ -221,7 +257,13 @@ public class ScannerCallable extends Ser
         this.scan, 0, false);
     try {
       ScanResponse response = server.scan(null, request);
-      return response.getScannerId();
+      long id = response.getScannerId();
+      if (logScannerActivity) {
+        LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
+          + " on region " + this.location.toString() + " ip:"
+          + this.location.getServerAddress().getBindAddress());
+      }
+      return id;
     } catch (ServiceException se) {
       throw ProtobufUtil.getRemoteException(se);
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java?rev=1339806&r1=1339805&r2=1339806&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java Thu May 17 19:15:28 2012
@@ -23,11 +23,13 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.util.StringUtils;
 
+import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
 
 /**
  * Iterate over an HBase table data, return (Text, RowResult) pairs
@@ -49,6 +52,10 @@ public class TableRecordReaderImpl {
   private ResultScanner scanner;
   private HTable htable;
   private byte [][] trrInputColumns;
+  private long timestamp;
+  private int rowcount;
+  private boolean logScannerActivity = false;
+  private int logPerRowCount = 100;
 
   /**
    * Restart from survivable exceptions by creating a new scanner.
@@ -57,6 +64,7 @@ public class TableRecordReaderImpl {
    * @throws IOException
    */
   public void restart(byte[] firstRow) throws IOException {
+    Scan currentScan;
     if ((endRow != null) && (endRow.length > 0)) {
       if (trrRowFilter != null) {
         Scan scan = new Scan(firstRow, endRow);
@@ -64,6 +72,7 @@ public class TableRecordReaderImpl {
         scan.setFilter(trrRowFilter);
         scan.setCacheBlocks(false);
         this.scanner = this.htable.getScanner(scan);
+        currentScan = scan;
       } else {
         LOG.debug("TIFB.restart, firstRow: " +
             Bytes.toStringBinary(firstRow) + ", endRow: " +
@@ -71,6 +80,7 @@ public class TableRecordReaderImpl {
         Scan scan = new Scan(firstRow, endRow);
         TableInputFormat.addColumns(scan, trrInputColumns);
         this.scanner = this.htable.getScanner(scan);
+        currentScan = scan;
       }
     } else {
       LOG.debug("TIFB.restart, firstRow: " +
@@ -80,6 +90,12 @@ public class TableRecordReaderImpl {
       TableInputFormat.addColumns(scan, trrInputColumns);
       scan.setFilter(trrRowFilter);
       this.scanner = this.htable.getScanner(scan);
+      currentScan = scan;
+    }
+    if (logScannerActivity) {
+      LOG.info("Current scan=" + currentScan.toString());
+      timestamp = System.currentTimeMillis();
+      rowcount = 0;
     }
   }
 
@@ -99,6 +115,10 @@ public class TableRecordReaderImpl {
    * @param htable the {@link HTable} to scan.
    */
   public void setHTable(HTable htable) {
+    Configuration conf = htable.getConfiguration();
+    logScannerActivity = conf.getBoolean(
+      ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
     this.htable = htable;
   }
 
@@ -174,32 +194,55 @@ public class TableRecordReaderImpl {
   throws IOException {
     Result result;
     try {
-      result = this.scanner.next();
-    } catch (DoNotRetryIOException e) {
-      throw e;
-    } catch (IOException e) {
-      LOG.debug("recovered from " + StringUtils.stringifyException(e));
-      if (lastSuccessfulRow == null) {
-        LOG.warn("We are restarting the first next() invocation," +
-            " if your mapper's restarted a few other times like this" +
-            " then you should consider killing this job and investigate" +
-            " why it's taking so long.");
-      }
-      if (lastSuccessfulRow == null) {
-        restart(startRow);
-      } else {
-        restart(lastSuccessfulRow);
-        this.scanner.next();    // skip presumed already mapped row
+      try {
+        result = this.scanner.next();
+        if (logScannerActivity) {
+          rowcount ++;
+          if (rowcount >= logPerRowCount) {
+            long now = System.currentTimeMillis();
+            LOG.info("Mapper took " + (now-timestamp)
+              + "ms to process " + rowcount + " rows");
+            timestamp = now;
+            rowcount = 0;
+          }
+        }
+      } catch (DoNotRetryIOException e) {
+        throw e;
+      } catch (IOException e) {
+        LOG.debug("recovered from " + StringUtils.stringifyException(e));
+        if (lastSuccessfulRow == null) {
+          LOG.warn("We are restarting the first next() invocation," +
+              " if your mapper has restarted a few other times like this" +
+              " then you should consider killing this job and investigate" +
+              " why it's taking so long.");
+        }
+        if (lastSuccessfulRow == null) {
+          restart(startRow);
+        } else {
+          restart(lastSuccessfulRow);
+          this.scanner.next();    // skip presumed already mapped row
+        }
+        result = this.scanner.next();
       }
-      result = this.scanner.next();
-    }
 
-    if (result != null && result.size() > 0) {
-      key.set(result.getRow());
-      lastSuccessfulRow = key.get();
-      Writables.copyWritable(result, value);
-      return true;
+      if (result != null && result.size() > 0) {
+        key.set(result.getRow());
+        lastSuccessfulRow = key.get();
+        Writables.copyWritable(result, value);
+        return true;
+      }
+      return false;
+    } catch (IOException ioe) {
+      if (logScannerActivity) {
+        long now = System.currentTimeMillis();
+        LOG.info("Mapper took " + (now-timestamp)
+          + "ms to process " + rowcount + " rows");
+        LOG.info(ioe);
+        String lastRow = lastSuccessfulRow == null ?
+          "null" : Bytes.toStringBinary(lastSuccessfulRow);
+        LOG.info("lastSuccessfulRow=" + lastRow);
+      }
+      throw ioe;
     }
-    return false;
   }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java?rev=1339806&r1=1339805&r2=1339806&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java Thu May 17 19:15:28 2012
@@ -24,11 +24,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -46,7 +48,8 @@ import org.apache.hadoop.util.StringUtil
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class TableRecordReaderImpl {
-
+  public static final String LOG_PER_ROW_COUNT
+    = "hbase.mapreduce.log.scanner.rowcount";
 
   static final Log LOG = LogFactory.getLog(TableRecordReader.class);
 
@@ -62,6 +65,10 @@ public class TableRecordReaderImpl {
   private Result value = null;
   private TaskAttemptContext context = null;
   private Method getCounter = null;
+  private long timestamp;
+  private int rowcount;
+  private boolean logScannerActivity = false;
+  private int logPerRowCount = 100;
 
   /**
    * Restart from survivable exceptions by creating a new scanner.
@@ -75,6 +82,11 @@ public class TableRecordReaderImpl {
     currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
       Bytes.toBytes(Boolean.TRUE));
     this.scanner = this.htable.getScanner(currentScan);
+    if (logScannerActivity) {
+      LOG.info("Current scan=" + currentScan.toString());
+      timestamp = System.currentTimeMillis();
+      rowcount = 0;
+    }
   }
 
   /**
@@ -103,6 +115,10 @@ public class TableRecordReaderImpl {
    * @param htable  The {@link HTable} to scan.
    */
   public void setHTable(HTable htable) {
+    Configuration conf = htable.getConfiguration();
+    logScannerActivity = conf.getBoolean(
+      ScannerCallable.LOG_SCANNER_ACTIVITY, false);
+    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
     this.htable = htable;
   }
 
@@ -174,33 +190,56 @@ public class TableRecordReaderImpl {
     if (key == null) key = new ImmutableBytesWritable();
     if (value == null) value = new Result();
     try {
-      value = this.scanner.next();
-    } catch (DoNotRetryIOException e) {
-      throw e;
-    } catch (IOException e) {
-      LOG.info("recovered from " + StringUtils.stringifyException(e));
-      if (lastSuccessfulRow == null) {
-        LOG.warn("We are restarting the first next() invocation," +
-            " if your mapper's restarted a few other times like this" +
-            " then you should consider killing this job and investigate" +
-            " why it's taking so long.");
+      try {
+        value = this.scanner.next();
+        if (logScannerActivity) {
+          rowcount ++;
+          if (rowcount >= logPerRowCount) {
+            long now = System.currentTimeMillis();
+            LOG.info("Mapper took " + (now-timestamp)
+              + "ms to process " + rowcount + " rows");
+            timestamp = now;
+            rowcount = 0;
+          }
+        }
+      } catch (DoNotRetryIOException e) {
+        throw e;
+      } catch (IOException e) {
+        LOG.info("recovered from " + StringUtils.stringifyException(e));
+        if (lastSuccessfulRow == null) {
+          LOG.warn("We are restarting the first next() invocation," +
+              " if your mapper has restarted a few other times like this" +
+              " then you should consider killing this job and investigate" +
+              " why it's taking so long.");
+        }
+        if (lastSuccessfulRow == null) {
+          restart(scan.getStartRow());
+        } else {
+          restart(lastSuccessfulRow);
+          scanner.next();    // skip presumed already mapped row
+        }
+        value = scanner.next();
       }
-      if (lastSuccessfulRow == null) {
-        restart(scan.getStartRow());
-      } else {
-        restart(lastSuccessfulRow);
-        scanner.next();    // skip presumed already mapped row
+      if (value != null && value.size() > 0) {
+        key.set(value.getRow());
+        lastSuccessfulRow = key.get();
+        return true;
       }
-      value = scanner.next();
-    }
-    if (value != null && value.size() > 0) {
-      key.set(value.getRow());
-      lastSuccessfulRow = key.get();
-      return true;
-    }
 
-    updateCounters();
-    return false;
+      updateCounters();
+      return false;
+    } catch (IOException ioe) {
+      if (logScannerActivity) {
+        long now = System.currentTimeMillis();
+        LOG.info("Mapper took " + (now-timestamp)
+          + "ms to process " + rowcount + " rows");
+        LOG.info(ioe);
+        String lastRow = lastSuccessfulRow == null ?
+          "null" : Bytes.toStringBinary(lastSuccessfulRow);
+        LOG.info("lastSuccessfulRow=" + lastRow);
+      }
+      throw ioe;
+    }
   }
 
   /**