You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:27:05 UTC

svn commit: r1181607 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: nspiegelberg
Date: Tue Oct 11 02:27:04 2011
New Revision: 1181607

URL: http://svn.apache.org/viewvc?rev=1181607&view=rev
Log:
Enhanced the MapReduce TableInputFormat to support any number of mappers per region.

Summary:
Current TableInputFormat based MR jobs create exactly one mapper per region
where each mapper sets one Scan with appropriate start/stop row keys. This
change allows jobs to be run with any number of mappers per region, so that
when a mapper fails, there will be less data to be reprocessed.

In general, each pair of consecutive dividing row keys is divided using
BigInteger math, and on edgy regions, the interval of the finer splits is used
to approximate the split dividing keys.

Added a unit test TestTableInputFormatNMappersPerRegion for TableInputFormat
to the MapReduce package.

Test Plan: Passed unit tests in mapreduce package.
Reviewers: kannan, kranganathan, nspiegelberg
Reviewed By: kannan
Commenters: kannan
CC: aaiyer, jgray

Differential Revision: 279530
Task ID: 620261

Added:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatNMappersPerRegion.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Bytes.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=1181607&r1=1181606&r2=1181607&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Tue Oct 11 02:27:04 2011
@@ -61,6 +61,8 @@ implements Configurable {
   public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
   /** The number of rows for caching that will be passed to scanners. */
   public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
+  /** The number of mappers that should be assigned to each region. */
+  public static final String MAPPERS_PER_REGION = "hbase.mapreduce.mappersperregion";
 
   /** The configuration. */
   private Configuration conf = null;
@@ -139,6 +141,10 @@ implements Configurable {
       }
     }
 
+    if (conf.get(MAPPERS_PER_REGION) != null) {
+      setNumMapperPerRegion(Integer.parseInt(conf.get(MAPPERS_PER_REGION)));
+    }
+
     setScan(scan);
   }
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1181607&r1=1181606&r2=1181607&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Tue Oct 11 02:27:04 2011
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -78,7 +79,8 @@ extends InputFormat<ImmutableBytesWritab
   private HTable table = null;
   /** The reader scanning the table, can be a custom one. */
   private TableRecordReader tableRecordReader = null;
-
+  /** The number of mappers to assign to each region. */
+  private int numMappersPerRegion = 1;
 
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
@@ -131,33 +133,85 @@ extends InputFormat<ImmutableBytesWritab
     if (table == null) {
       throw new IOException("No table was provided.");
     }
-    int count = 0;
-    List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
-    for (int i = 0; i < keys.getFirst().length; i++) {
-      if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+    Pair<byte[][], byte[][]> splitKeys = null;
+    int numRegions = keys.getFirst().length;
+    //TODO: Can anything else be done when there are less than 3 regions?
+    if ((numMappersPerRegion == 1) || (numRegions < 3)) {
+      numMappersPerRegion = 1;
+      splitKeys = keys;
+    } else {
+      byte[][] startKeys = new byte[numRegions * numMappersPerRegion][];
+      byte[][] stopKeys = new byte[numRegions * numMappersPerRegion][];
+      // Insert null keys at edges
+      startKeys[0] = HConstants.EMPTY_START_ROW;
+      stopKeys[numRegions * numMappersPerRegion - 1] = HConstants.EMPTY_END_ROW;
+      // Split the second region
+      byte[][] dividingKeys = Bytes.split(keys.getFirst()[1],
+          keys.getSecond()[1], numMappersPerRegion - 1);
+      int count = numMappersPerRegion - 1;
+      stopKeys[count] = keys.getSecond()[0];
+      // Use the interval between these splits to calculate the approximate
+        // dividing keys of the first region
+      for (byte[] approxKey : Bytes.arithmeticProgSeq(dividingKeys[1], dividingKeys[0],
+          numMappersPerRegion - 1)) {
+        startKeys[count--] = approxKey;
+        stopKeys[count] = approxKey;
+      }
+      // Add the second region dividing keys
+      for (int i = 0; i < numMappersPerRegion; i++) {
+        startKeys[numMappersPerRegion + i] = dividingKeys[i];
+        stopKeys[numMappersPerRegion + i] = dividingKeys[i + 1];
+      }
+      // Fill out all the split keys for center regions (3rd...(n-1)th)
+      for (int i = 2; i < numRegions - 1; i++) {
+        dividingKeys = Bytes.split(keys.getFirst()[i],
+            keys.getSecond()[i], numMappersPerRegion - 1);
+        for (int j = 0; j < numMappersPerRegion; j++) {
+          startKeys[i * numMappersPerRegion + j] = dividingKeys[j];
+          stopKeys[i * numMappersPerRegion + j] = dividingKeys[j + 1];
+        }
+      }
+      // Use the previous intervals to calc dividing keys of the last region
+      count = numMappersPerRegion * (numRegions - 1);
+      startKeys[count] = keys.getFirst()[numRegions - 1];
+      for (byte[] approxKey : Bytes.arithmeticProgSeq(dividingKeys[numMappersPerRegion - 1],
+          dividingKeys[numMappersPerRegion], numMappersPerRegion - 1)) {
+        stopKeys[count++] = approxKey;
+        startKeys[count] = approxKey;
+      }
+      splitKeys = new Pair<byte[][], byte[][]>();
+      splitKeys.setFirst(startKeys);
+      splitKeys.setSecond(stopKeys);
+    }
+    List<InputSplit> splits =
+        new ArrayList<InputSplit>(numRegions * numMappersPerRegion);
+    byte[] startRow = scan.getStartRow();
+    byte[] stopRow = scan.getStopRow();
+    int numSplits = 0;
+    for (int i = 0; i < numRegions * numMappersPerRegion; i++) {
+      if (!includeRegionInSplit(keys.getFirst()[i / numMappersPerRegion],
+          keys.getSecond()[i / numMappersPerRegion])) {
         continue;
       }
-      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
+      String regionLocation = table.getRegionLocation(splitKeys.getFirst()[i]).
         getServerAddress().getHostname();
-      byte[] startRow = scan.getStartRow();
-      byte[] stopRow = scan.getStopRow();
       // determine if the given start an stop key fall into the region
-      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-           Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+      if ((startRow.length == 0 || splitKeys.getSecond()[i].length == 0 ||
+          Bytes.compareTo(startRow, splitKeys.getSecond()[i]) < 0) &&
           (stopRow.length == 0 ||
-           Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+          Bytes.compareTo(stopRow, splitKeys.getFirst()[i]) > 0)) {
         byte[] splitStart = startRow.length == 0 ||
-          Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-            keys.getFirst()[i] : startRow;
+            Bytes.compareTo(splitKeys.getFirst()[i], startRow) >= 0 ?
+                splitKeys.getFirst()[i] : startRow;
         byte[] splitStop = (stopRow.length == 0 ||
-          Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-          keys.getSecond()[i].length > 0 ?
-            keys.getSecond()[i] : stopRow;
+            Bytes.compareTo(splitKeys.getSecond()[i], stopRow) <= 0) &&
+            splitKeys.getSecond()[i].length > 0 ?
+                splitKeys.getSecond()[i] : stopRow;
         InputSplit split = new TableSplit(table.getTableName(),
-          splitStart, splitStop, regionLocation);
+            splitStart, splitStop, regionLocation);
         splits.add(split);
         if (LOG.isDebugEnabled())
-          LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
+          LOG.debug("getSplits: split -> " + (numSplits++) + " -> " + split);
       }
     }
     return splits;
@@ -236,4 +290,17 @@ extends InputFormat<ImmutableBytesWritab
     this.tableRecordReader = tableRecordReader;
   }
 
+  /**
+   * Sets the number of mappers assigned to each region.
+   *
+   * @param num
+   * @throws IllegalArgumentException When <code>num</code> <= 0.
+   */
+  public void setNumMapperPerRegion(int num) throws IllegalArgumentException {
+    if (num <= 0) {
+      throw new IllegalArgumentException("Expecting at least 1 mapper " +
+		"per region; instead got: " + num);
+    }
+    numMappersPerRegion = num;
+  }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1181607&r1=1181606&r2=1181607&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Tue Oct 11 02:27:04 2011
@@ -1064,7 +1064,7 @@ public class Bytes {
     if (compareTo(aPadded,bPadded) >= 0) {
       throw new IllegalArgumentException("b <= a");
     }
-    if (num <= 0) {
+    if (num < 0) {
       throw new IllegalArgumentException("num cannot be < 0");
     }
     byte [] prependHeader = {1, 0};
@@ -1085,6 +1085,7 @@ public class Bytes {
 
     final Iterator<byte[]> iterator = new Iterator<byte[]>() {
       private int i = -1;
+      private BigInteger curBI = startBI;
 
       @Override
       public boolean hasNext() {
@@ -1097,7 +1098,7 @@ public class Bytes {
         if (i == 0) return a;
         if (i == num + 1) return b;
 
-        BigInteger curBI = startBI.add(intervalBI.multiply(BigInteger.valueOf(i)));
+        curBI = curBI.add(intervalBI);
         byte [] padded = curBI.toByteArray();
         if (padded[1] == 0)
           padded = tail(padded, padded.length - 2);
@@ -1122,6 +1123,53 @@ public class Bytes {
   }
 
   /**
+   * Calculate the next <code>num</code> elements in arithemetic
+   * progression sequence.
+   *
+   * @param a First element.
+   * @param b Second element.
+   * @param num Number of next elements to find.
+   * @return <code>num</code> byte arrays each having the same interval
+   *         from <code>a</code> to <code>b</code>, starting from b. In
+   *         other words, it returns an array consists of b+(b-a)*(i+1),
+   *         where i is the index of the resulting array of size <code>
+   *         num</code>. Uses BigInteger math.
+   */
+  public static byte[][] arithmeticProgSeq(byte[] a, byte[] b, int num) {
+    byte [][] result = new byte[num][];
+    byte [] aPadded;
+    byte [] bPadded;
+    if (a.length < b.length) {
+      aPadded = padTail(a, b.length - a.length);
+      bPadded = b;
+    } else if (b.length < a.length) {
+      aPadded = a;
+      bPadded = padTail(b, a.length - b.length);
+    } else {
+      aPadded = a;
+      bPadded = b;
+    }
+    if (num < 0) {
+      throw new IllegalArgumentException("num cannot be < 0");
+    }
+    byte [] prependHeader = {1, 0};
+    BigInteger startBI = new BigInteger(add(prependHeader, aPadded));
+    BigInteger stopBI = new BigInteger(add(prependHeader, bPadded));
+    BigInteger diffBI = stopBI.subtract(startBI);
+    BigInteger curBI = stopBI;
+    for (int i = 0; i < num; i++) {
+      curBI = curBI.add(diffBI);
+      byte [] padded = curBI.toByteArray();
+      if (padded[1] == 0)
+        padded = tail(padded, padded.length - 2);
+      else
+        padded = tail(padded, padded.length - 1);
+      result[i] = padded;
+    }
+    return result;
+  }
+
+  /**
    * @param t operands
    * @return Array of byte arrays made from passed array of Text
    */

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatNMappersPerRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatNMappersPerRegion.java?rev=1181607&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatNMappersPerRegion.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatNMappersPerRegion.java Tue Oct 11 02:27:04 2011
@@ -0,0 +1,244 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobInProgress;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests TableInputFormat with varying numbers of mappers per region.
+ */
+public class TestTableInputFormatNMappersPerRegion {
+
+  static final String SPECULATIVE_EXECUTION = "mapred.map.tasks.speculative.execution";
+  static final Log LOG = LogFactory.getLog(TestTableInputFormatScan.class);
+  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+
+  private static HTable table = null;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.enableDebug(TableInputFormat.class);
+    TEST_UTIL.enableDebug(TableInputFormatBase.class);
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Configuration c = TEST_UTIL.getConfiguration();
+    FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
+  }
+
+  /**
+   * Mapper that runs the count.
+   */
+  public static class RowCounterMapper
+  extends TableMapper<ImmutableBytesWritable, Result> {
+
+    /** Counter enumeration to count the actual rows. */
+    public static enum Counters {ROWS}
+
+    /**
+     * Maps the data.
+     *
+     * @param row  The current table row key.
+     * @param values  The columns.
+     * @param context  The current context.
+     * @throws IOException When something is broken with the data.
+     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
+     *   org.apache.hadoop.mapreduce.Mapper.Context)
+     */
+    @Override
+    public void map(ImmutableBytesWritable row, Result values,
+      Context context)
+    throws IOException {
+      for (KeyValue value: values.list()) {
+        if (value.getValue().length > 0) {
+          context.getCounter(Counters.ROWS).increment(1);
+          break;
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Tests whether TableInputFormat works correctly when number of mappers
+   * per region is set to 1.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testOneMapperPerRegion()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("testOneMapperPerRegion", 1, 25);
+  }
+
+  /**
+   * Tests when number of mappers is set to 3.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testThreeMappersPerRegion()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("testThreeMappersPerRegion", 3, 25);
+  }
+
+  /**
+   * Tests the scenario where there is only one region. Expecting resumption to
+   * one mapper per region.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testOnTableWithOneRegion()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("testOnTableWithOneRegion", 5, 1);
+  }
+
+  /**
+   * Tests the scenario where there is only two regions. Expecting resumption to
+   * one mapper per region.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testOnTableWithTwoRegions()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    testScan("testOnTableWithTwoRegions", 5, 2);
+  }
+
+  /**
+   * Tests whether the framework correctly detects illegal inputs.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testIllegalNumberOfMappers()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    try {
+      testScan("testZeroMapper", 0, 25);
+      assertTrue("Should not be able to take 0 as number of mappers " +
+          "per region", false);
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+    try {
+      testScan("testNegOneMapper", -1, 25);
+      assertTrue("Should not be able to take -1 as number of mappers " +
+          "per region", false);
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+  }
+
+  // If numRegions > 2, this creates 25 regions, rather than numRegions regions.
+  private void testScan(String tableName, int numMappersPerRegion, int numRegions)
+  throws IOException, InterruptedException, ClassNotFoundException {
+    String jobName = tableName + "_job";
+    Configuration c = TEST_UTIL.getConfiguration();
+    // Force disable speculative maps
+    c.setBoolean(SPECULATIVE_EXECUTION, false);
+    // Set the number of maps per region for this job
+    c.setInt(TableInputFormat.MAPPERS_PER_REGION, numMappersPerRegion);
+    // Create and fill table
+    table = TEST_UTIL.createTable(Bytes.toBytes(tableName), INPUT_FAMILY);
+    // Store the number of regions opened
+    int regionsOpened = 0;
+    if (numRegions == 2) {
+      byte[][] startKeys = { HConstants.EMPTY_START_ROW, Bytes.toBytes("mmm") };
+      regionsOpened = TEST_UTIL.createMultiRegions(c, table, INPUT_FAMILY, startKeys);
+    } else if (numRegions == 1) {
+      regionsOpened = 1;
+    } else if (numRegions > 2) {
+      regionsOpened = TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
+    } else {
+      throw new IllegalArgumentException("Expect positive number of regions but got " +
+          numRegions);
+    }
+    // Store the number of rows loaded to the table
+    int rowsLoaded = TEST_UTIL.loadTable(table, INPUT_FAMILY);
+    Scan scan = new Scan();
+    scan.addFamily(INPUT_FAMILY);
+    scan.setFilter(new FirstKeyOnlyFilter());
+    Job job = new Job(c, jobName);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    TableMapReduceUtil.initTableMapperJob(tableName, scan,
+        RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
+    job.waitForCompletion(true);
+    assertTrue(job.isComplete());
+    // Get statistics
+    Counters counters = job.getCounters();
+    long totalMapCount = counters
+        .findCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS).getValue();
+    long totalRowCount = counters
+        .findCounter(RowCounterMapper.Counters.ROWS).getValue();
+    int actualNumMappersPerRegion = (numRegions > 2) ? numMappersPerRegion : 1;
+    assertEquals("Tried to open " + actualNumMappersPerRegion * regionsOpened +
+        " maps but got " + totalMapCount,
+        actualNumMappersPerRegion * regionsOpened, totalMapCount);
+    assertEquals("Supposed to find " + rowsLoaded + " rows but got " + totalRowCount,
+        rowsLoaded, totalRowCount);
+  }
+}