You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2012/09/20 19:59:30 UTC

svn commit: r1388141 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java

Author: ramkrishna
Date: Thu Sep 20 17:59:30 2012
New Revision: 1388141

URL: http://svn.apache.org/viewvc?rev=1388141&view=rev
Log:
HBASE-6698 Refactor checkAndPut and checkAndDelete to use doMiniBatchMutation
(Priya)

Submitted by:PrIya
Reviewed by:Ram, Stack, Ted, Lars	

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1388141&r1=1388140&r2=1388141&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Sep 20 17:59:30 2012
@@ -1783,8 +1783,7 @@ public class HRegion implements HeapSize
 
       try {
         // All edits for the given row (across all column families) must happen atomically.
-        prepareDelete(delete);
-        internalDelete(delete, delete.getClusterId(), writeToWAL);
+        doBatchMutate(delete, lid);
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -1801,11 +1800,11 @@ public class HRegion implements HeapSize
    */
   void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
       boolean writeToWAL) throws IOException {
-    Delete delete = new Delete();
+    Delete delete = new Delete(HConstants.EMPTY_BYTE_ARRAY);
     delete.setFamilyMap(familyMap);
     delete.setClusterId(clusterId);
     delete.setWriteToWAL(writeToWAL);
-    internalDelete(delete, clusterId, writeToWAL);
+    doBatchMutate(delete, null);
   }
 
   /**
@@ -1863,65 +1862,6 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * @param delete The Delete command
-   * @param clusterId UUID of the originating cluster (for replication).
-   * @param writeToWAL
-   * @throws IOException
-   */
-  private void internalDelete(Delete delete, UUID clusterId,
-      boolean writeToWAL) throws IOException {
-    Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
-    WALEdit walEdit = new WALEdit();
-    /* Run coprocessor pre hook outside of locks to avoid deadlock */
-    if (coprocessorHost != null) {
-      if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) {
-        return;
-      }
-    }
-
-    long now = EnvironmentEdgeManager.currentTimeMillis();
-    byte [] byteNow = Bytes.toBytes(now);
-    boolean flush = false;
-
-    updatesLock.readLock().lock();
-    try {
-      prepareDeleteTimestamps(delete.getFamilyMap(), byteNow);
-
-      if (writeToWAL) {
-        // write/sync to WAL should happen before we touch memstore.
-        //
-        // If order is reversed, i.e. we write to memstore first, and
-        // for some reason fail to write/sync to commit log, the memstore
-        // will contain uncommitted transactions.
-        //
-        // bunch up all edits across all column families into a
-        // single WALEdit.
-        addFamilyMapToWALEdit(familyMap, walEdit);
-        this.log.append(regionInfo, this.htableDescriptor.getName(),
-            walEdit, clusterId, now, this.htableDescriptor);
-      }
-
-      // Now make changes to the memstore.
-      long addedSize = applyFamilyMapToMemstore(familyMap, null);
-      flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
-
-    } finally {
-      this.updatesLock.readLock().unlock();
-    }
-    // do after lock
-    if (coprocessorHost != null) {
-      coprocessorHost.postDelete(delete, walEdit, writeToWAL);
-    }
-    final long after = EnvironmentEdgeManager.currentTimeMillis();
-    this.opMetrics.updateDeleteMetrics(familyMap.keySet(), after-now);
-
-    if (flush) {
-      // Request a cache flush.  Do it outside update lock.
-      requestFlush();
-    }
-  }
-
-  /**
    * @param put
    * @throws IOException
    */
@@ -1978,7 +1918,7 @@ public class HRegion implements HeapSize
 
       try {
         // All edits for the given row (across all column families) must happen atomically.
-        internalPut(put, put.getClusterId(), writeToWAL);
+        doBatchMutate(put, lid);
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -2444,7 +2384,6 @@ public class HRegion implements HeapSize
     }
 
     startRegionOperation();
-    this.writeRequestsCount.increment();
     try {
       RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
       Get get = new Get(row, lock);
@@ -2496,17 +2435,7 @@ public class HRegion implements HeapSize
         if (matches) {
           // All edits for the given row (across all column families) must
           // happen atomically.
-          //
-          // Using default cluster id, as this can only happen in the
-          // originating cluster. A slave cluster receives the result as a Put
-          // or Delete
-          if (isPut) {
-            internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
-          } else {
-            Delete d = (Delete)w;
-            prepareDelete(d);
-            internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
-          }
+          doBatchMutate((Mutation)w, lid);
           this.checkAndMutateChecksPassed.increment();
           return true;
         }
@@ -2520,6 +2449,18 @@ public class HRegion implements HeapSize
     }
   }
 
+  @SuppressWarnings("unchecked")
+  private void doBatchMutate(Mutation mutation, Integer lid) throws IOException,
+      DoNotRetryIOException {
+    Pair<Mutation, Integer>[] mutateWithLocks = new Pair[] { new Pair<Mutation, Integer>(mutation,
+        lid) };
+    OperationStatus[] batchMutate = this.batchMutate(mutateWithLocks);
+    if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
+      throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
+    } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
+      throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
+    }
+  }
 
   /**
    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
@@ -2592,7 +2533,7 @@ public class HRegion implements HeapSize
    * @praram now
    * @throws IOException
    */
-  private void put(byte [] family, List<KeyValue> edits)
+  private void put(byte [] family, List<KeyValue> edits, Integer lid)
   throws IOException {
     Map<byte[], List<KeyValue>> familyMap;
     familyMap = new HashMap<byte[], List<KeyValue>>();
@@ -2602,70 +2543,9 @@ public class HRegion implements HeapSize
     p.setFamilyMap(familyMap);
     p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
     p.setWriteToWAL(true);
-    this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true);
+    doBatchMutate(p, lid);
   }
-
-  /**
-   * Add updates first to the hlog (if writeToWal) and then add values to memstore.
-   * Warning: Assumption is caller has lock on passed in row.
-   * @param put The Put command
-   * @param clusterId UUID of the originating cluster (for replication).
-   * @param writeToWAL if true, then we should write to the log
-   * @throws IOException
-   */
-  private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException {
-    Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
-    WALEdit walEdit = new WALEdit();
-    /* run pre put hook outside of lock to avoid deadlock */
-    if (coprocessorHost != null) {
-      if (coprocessorHost.prePut(put, walEdit, writeToWAL)) {
-        return;
-      }
-    }
-
-    long now = EnvironmentEdgeManager.currentTimeMillis();
-    byte[] byteNow = Bytes.toBytes(now);
-    boolean flush = false;
-
-    this.updatesLock.readLock().lock();
-    try {
-      checkFamilies(familyMap.keySet());
-      checkTimestamps(familyMap, now);
-      updateKVTimestamps(familyMap.values(), byteNow);
-      // write/sync to WAL should happen before we touch memstore.
-      //
-      // If order is reversed, i.e. we write to memstore first, and
-      // for some reason fail to write/sync to commit log, the memstore
-      // will contain uncommitted transactions.
-      if (writeToWAL) {
-        addFamilyMapToWALEdit(familyMap, walEdit);
-        this.log.append(regionInfo, this.htableDescriptor.getName(),
-            walEdit, clusterId, now, this.htableDescriptor);
-      } else {
-        recordPutWithoutWal(familyMap);
-      }
-
-      long addedSize = applyFamilyMapToMemstore(familyMap, null);
-      flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
-    } finally {
-      this.updatesLock.readLock().unlock();
-    }
-
-    if (coprocessorHost != null) {
-      coprocessorHost.postPut(put, walEdit, writeToWAL);
-    }
-
-    // do after lock
-    final long after = EnvironmentEdgeManager.currentTimeMillis();
-    this.opMetrics.updatePutMetrics(familyMap.keySet(), after - now);
-
-
-    if (flush) {
-      // Request a cache flush.  Do it outside update lock.
-      requestFlush();
-    }
-  }
-
+ 
   /**
    * Atomically apply the given map of family->edits to the memstore.
    * This handles the consistency control on its own, but the caller
@@ -4019,7 +3899,7 @@ public class HRegion implements HeapSize
       edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
         HConstants.META_VERSION_QUALIFIER, now,
         Bytes.toBytes(HConstants.META_VERSION)));
-      meta.put(HConstants.CATALOG_FAMILY, edits);
+      meta.put(HConstants.CATALOG_FAMILY, edits, lid);
     } finally {
       meta.releaseRowLock(lid);
     }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java?rev=1388141&r1=1388140&r2=1388141&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java Thu Sep 20 17:59:30 2012
@@ -189,10 +189,10 @@ public class TestRegionServerMetrics {
     assertTimeVaryingMetricCount(1, TABLE_NAME, cf, regionName, "append_");
 
     // One delete where the cf is known
-    assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "delete_");
+    assertTimeVaryingMetricCount(1, TABLE_NAME, cf, null, "multidelete_");
 
     // two deletes in the region.
-    assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "delete_");
+    assertTimeVaryingMetricCount(2, TABLE_NAME, null, regionName, "multidelete_");
 
     // Three gets. one for gets. One for append. One for increment.
     assertTimeVaryingMetricCount(3, TABLE_NAME, cf, regionName, "get_");