You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/02/12 20:42:27 UTC

svn commit: r1567736 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: jxiang
Date: Wed Feb 12 19:42:27 2014
New Revision: 1567736

URL: http://svn.apache.org/r1567736
Log:
HBASE-7849 Provide administrative limits around bulkloads of files into a single region

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1567736&r1=1567735&r2=1567736&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Feb 12 19:42:27 2014
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -42,6 +43,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -68,7 +70,6 @@ import org.apache.hadoop.hbase.client.co
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
@@ -102,7 +103,11 @@ public class LoadIncrementalHFiles exten
   private HBaseAdmin hbAdmin;
 
   public static final String NAME = "completebulkload";
+  public static final String MAX_FILES_PER_REGION_PER_FAMILY
+    = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+
+  private int maxFilesPerRegionPerFamily;
   private boolean assignSeqIds;
 
   private boolean hasForwardedToken;
@@ -119,6 +124,7 @@ public class LoadIncrementalHFiles exten
     this.hbAdmin = new HBaseAdmin(conf);
     this.userProvider = UserProvider.instantiate(conf);
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
+    maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
   }
 
   private void usage() {
@@ -291,6 +297,12 @@ public class LoadIncrementalHFiles exten
         Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
             pool, queue, startEndKeys);
 
+        if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
+          // Error is logged inside checkHFilesCountPerRegionPerFamily.
+          throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
+            + " hfiles to one family of one region");
+        }
+
         bulkLoadPhase(table, conn, pool, queue, regionGroups);
 
         // NOTE: The next iteration's split / group could happen in parallel to
@@ -378,6 +390,31 @@ public class LoadIncrementalHFiles exten
     }
   }
 
+  private boolean checkHFilesCountPerRegionPerFamily(
+      final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
+    for (Entry<ByteBuffer,
+        ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
+      final Collection<LoadQueueItem> lqis =  e.getValue();
+      HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
+      for (LoadQueueItem lqi: lqis) {
+        MutableInt count = filesMap.get(lqi.family);
+        if (count == null) {
+          count = new MutableInt();
+          filesMap.put(lqi.family, count);
+        }
+        count.increment();
+        if (count.intValue() > maxFilesPerRegionPerFamily) {
+          LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
+            + " hfiles to family " + Bytes.toStringBinary(lqi.family)
+            + " of region with start key "
+            + Bytes.toStringBinary(e.getKey()));
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
   /**
    * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
    * bulk load region targets.

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1567736&r1=1567735&r2=1567736&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Wed Feb 12 19:42:27 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduc
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.TreeMap;
@@ -60,7 +61,8 @@ import org.junit.experimental.categories
 public class TestLoadIncrementalHFiles {
   private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
   private static final byte[] FAMILY = Bytes.toBytes("myfam");
-  private static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
+  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
+  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
 
   private static final byte[][] SPLIT_KEYS = new byte[][] {
     Bytes.toBytes("ddd"),
@@ -75,6 +77,9 @@ public class TestLoadIncrementalHFiles {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    util.getConfiguration().setInt(
+      LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+      MAX_FILES_PER_REGION_PER_FAMILY);
     util.startMiniCluster();
   }
 
@@ -355,6 +360,29 @@ public class TestLoadIncrementalHFiles {
     }
   }
 
+  @Test
+  public void testLoadTooMayHFiles() throws Exception {
+    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
 
+    byte[] from = Bytes.toBytes("begin");
+    byte[] to = Bytes.toBytes("end");
+    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
+      createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+          + i), FAMILY, QUALIFIER, from, to, 1000);
+    }
+
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+    String [] args= {dir.toString(), "mytable_testLoadTooMayHFiles"};
+    try {
+      loader.run(args);
+      fail("Bulk loading too many files should fail");
+    } catch (IOException ie) {
+      assertTrue(ie.getMessage().contains("Trying to load more than "
+        + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
+    }
+  }
 }
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java?rev=1567736&r1=1567735&r2=1567736&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java Wed Feb 12 19:42:27 2014
@@ -48,6 +48,9 @@ public class TestSecureLoadIncrementalHF
       HadoopSecurityEnabledUserProviderForTesting.class);
     // setup configuration
     SecureTestUtil.enableSecurity(util.getConfiguration());
+    util.getConfiguration().setInt(
+        LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+        MAX_FILES_PER_REGION_PER_FAMILY);
 
     util.startMiniCluster();