You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/07/15 01:36:02 UTC

hbase git commit: Revert "HBASE-12596 bulkload needs to follow locality (Victor Xu)"

Repository: hbase
Updated Branches:
  refs/heads/0.98 94d5e7acc -> 3094ecb26


Revert "HBASE-12596 bulkload needs to follow locality (Victor Xu)"

This reverts commit bcb425f38f75359f662ea8cf49b3cd989998b780.

Java 6 and Hadoop 1 profile compilation problems


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3094ecb2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3094ecb2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3094ecb2

Branch: refs/heads/0.98
Commit: 3094ecb26611f93fe4bebe45c211d2eb3590f09b
Parents: 94d5e7a
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue Jul 14 16:34:35 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Jul 14 16:34:35 2015 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/HFileOutputFormat2.java     |  84 ++-------------
 .../hbase/mapreduce/TestHFileOutputFormat.java  |   9 --
 .../hbase/mapreduce/TestHFileOutputFormat2.java | 103 +++++--------------
 3 files changed, 33 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3094ecb2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index ae7e53d..91e5b22 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.util.ArrayList;
@@ -41,13 +40,11 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -108,15 +105,6 @@ public class HFileOutputFormat2
   public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
       "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
-  /**
-   * Keep locality while generating HFiles for bulkload. See HBASE-12596
-   */
-  public static final String LOCALITY_SENSITIVE_CONF_KEY =
-      "hbase.bulkload.locality.sensitive.enabled";
-  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
-  private static final String OUTPUT_TABLE_NAME_CONF_KEY =
-      "hbase.mapreduce.hfileoutputformat.table.name";
-
   @Override
   public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
       final TaskAttemptContext context) throws IOException, InterruptedException {
@@ -200,51 +188,7 @@ public class HFileOutputFormat2
 
         // create a new HLog writer, if necessary
         if (wl == null || wl.writer == null) {
-          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-            String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
-            HRegionLocation loc = null;
-            HTable htable = null;
-            try {
-              htable = new HTable(conf, tableName);
-              loc = htable.getRegionLocation(rowKey);
-            } catch (Throwable e) {
-              LOG.warn("there's something wrong when locating rowkey: " +
-                Bytes.toString(rowKey), e);
-              loc = null;
-            } finally {
-              if(null != htable) {
-                htable.close();
-              }
-            }
-
-            if (null == loc) {
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("failed to get region location, so use default writer: " +
-                  Bytes.toString(rowKey));
-              }
-              wl = getNewWriter(family, conf, null);
-            } else {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
-              }
-              InetSocketAddress initialIsa =
-                  new InetSocketAddress(loc.getHostname(), loc.getPort());
-              if (initialIsa.isUnresolved()) {
-                if (LOG.isTraceEnabled()) {
-                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
-                      + loc.getPort() + ", so use default writer");
-                }
-                wl = getNewWriter(family, conf, null);
-              } else {
-                if(LOG.isDebugEnabled()) {
-                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
-                }
-                wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
-              }
-            }
-          } else {
-            wl = getNewWriter(family, conf, null);
-          }
+          wl = getNewWriter(family, conf);
         }
 
         // we now have the proper HLog writer. full steam ahead
@@ -274,8 +218,8 @@ public class HFileOutputFormat2
        * @return A WriterLength, containing a new StoreFile.Writer.
        * @throws IOException
        */
-      private WriterLength getNewWriter(byte[] family, Configuration conf,
-          InetSocketAddress[] favoredNodes) throws IOException {
+      private WriterLength getNewWriter(byte[] family, Configuration conf)
+          throws IOException {
         WriterLength wl = new WriterLength();
         Path familydir = new Path(outputdir, Bytes.toString(family));
         Algorithm compression = compressionMap.get(family);
@@ -297,18 +241,10 @@ public class HFileOutputFormat2
         contextBuilder.withDataBlockEncoding(encoding);
         HFileContext hFileContext = contextBuilder.build();
                                     
-        if (null == favoredNodes) {
-          wl.writer =
-              new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
-                  .withOutputDir(familydir).withBloomType(bloomType)
-                  .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build();
-        } else {
-          wl.writer =
-              new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
-                  .withOutputDir(familydir).withBloomType(bloomType)
-                  .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
-                  .withFavoredNodes(favoredNodes).build();
-        }
+        wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
+            .withOutputDir(familydir).withBloomType(bloomType)
+            .withComparator(KeyValue.COMPARATOR)
+            .withFileContext(hFileContext).build();
 
         this.writers.put(family, wl);
         return wl;
@@ -448,12 +384,6 @@ public class HFileOutputFormat2
         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
         KeyValueSerialization.class.getName());
 
-    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-      // record this table name for creating writer by favored nodes
-      LOG.info("bulkload locality sensitive enabled");
-      conf.set(OUTPUT_TABLE_NAME_CONF_KEY, table.getName().getNameAsString());
-    }
-
     // Use table's region boundaries for TOP split points.
     LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3094ecb2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
index b8c0fc4..f3157f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
@@ -335,7 +335,6 @@ public class TestHFileOutputFormat  {
     job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
     HTable table = Mockito.mock(HTable.class);
     setupMockStartKeys(table);
-    setupMockTableName(table);
     HFileOutputFormat.configureIncrementalLoad(job, table);
     assertEquals(job.getNumReduceTasks(), 4);
   }
@@ -777,11 +776,6 @@ public class TestHFileOutputFormat  {
     Mockito.doReturn(mockKeys).when(table).getStartKeys();
   }
 
-  private void setupMockTableName(HTable table) throws IOException {
-    TableName mockTableName = TableName.valueOf("mock_table");
-    Mockito.doReturn(mockTableName).when(table).getName();
-  }
-
   /**
    * Test that {@link HFileOutputFormat} RecordWriter uses compression and
    * bloom filter settings from the column family descriptor
@@ -810,9 +804,6 @@ public class TestHFileOutputFormat  {
       // pollutes the GZip codec pool with an incompatible compressor.
       conf.set("io.seqfile.compression.type", "NONE");
       conf.set("hbase.fs.tmp.dir", dir.toString());
-      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
-      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
-
       Job job = new Job(conf, "testLocalMRIncrementalLoad");
       job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
       setupRandomGeneratorMapper(job);

http://git-wip-us.apache.org/repos/asf/hbase/blob/3094ecb2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index bff8494..28ca56f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HadoopShims;
 import org.apache.hadoop.hbase.KeyValue;
@@ -68,7 +67,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -338,7 +336,6 @@ public class TestHFileOutputFormat2  {
     job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
     HTable table = Mockito.mock(HTable.class);
     setupMockStartKeys(table);
-    setupMockTableName(table);
     HFileOutputFormat2.configureIncrementalLoad(job, table);
     assertEquals(job.getNumReduceTasks(), 4);
   }
@@ -357,66 +354,39 @@ public class TestHFileOutputFormat2  {
   @Test
   public void testMRIncrementalLoad() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoad\n");
-    doIncrementalLoadTest(false, false);
+    doIncrementalLoadTest(false);
   }
 
   @Test
   public void testMRIncrementalLoadWithSplit() throws Exception {
     LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
-    doIncrementalLoadTest(true, false);
+    doIncrementalLoadTest(true);
   }
 
-  /**
-   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
-   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
-   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
-   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
-   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
-   * locality features more easily.
-   */
-  @Test
-  public void testMRIncrementalLoadWithLocality() throws Exception {
-    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
-    doIncrementalLoadTest(false, true);
-    doIncrementalLoadTest(true, true);
-  }
-
-  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality)
-      throws Exception {
+  private void doIncrementalLoadTest(
+      boolean shouldChangeRegions) throws Exception {
     util = new HBaseTestingUtility();
     Configuration conf = util.getConfiguration();
-    conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
-    int hostCount = 1;
-    int regionNum = 5;
-    if (shouldKeepLocality) {
-      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
-      // explicit hostnames parameter just like MiniDFSCluster does.
-      hostCount = 3;
-      regionNum = 20;
-    }
-
-    byte[][] startKeys = generateRandomStartKeys(regionNum);
-    String[] hostnames = new String[hostCount];
-    for (int i = 0; i < hostCount; ++i) {
-      hostnames[i] = "datanode_" + i;
-    }
-
-    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+    byte[][] startKeys = generateRandomStartKeys(5);
     HBaseAdmin admin = null;
     try {
-      util.startMiniCluster(1, hostCount, hostnames);
+      util.startMiniCluster();
+      Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
       admin = new HBaseAdmin(conf);
       HTable table = util.createTable(TABLE_NAME, FAMILIES);
-      assertEquals("Should start with empty table", 0, util.countRows(table));
-      int numRegions =
-          util.createMultiRegions(util.getConfiguration(), table, FAMILIES[0], startKeys);
-      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
+      assertEquals("Should start with empty table",
+          0, util.countRows(table));
+      int numRegions = util.createMultiRegions(
+          util.getConfiguration(), table, FAMILIES[0], startKeys);
+      assertEquals("Should make 5 regions", numRegions, 5);
 
       // Generate the bulk load files
       util.startMiniMapReduceCluster();
       runIncrementalPELoad(conf, table, testDir);
       // This doesn't write into the table, just makes files
-      assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
+      assertEquals("HFOF should not touch actual table",
+          0, util.countRows(table));
+
 
       // Make sure that a directory was created for every CF
       int dir = 0;
@@ -433,16 +403,17 @@ public class TestHFileOutputFormat2  {
       if (shouldChangeRegions) {
         LOG.info("Changing regions in table");
         admin.disableTable(table.getTableName());
-        while (util.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
-            .isRegionsInTransition()) {
+        while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
+            getRegionStates().isRegionsInTransition()) {
           Threads.sleep(200);
           LOG.info("Waiting on table to finish disabling");
         }
         byte[][] newStartKeys = generateRandomStartKeys(15);
-        util.createMultiRegions(util.getConfiguration(), table, FAMILIES[0], newStartKeys);
+        util.createMultiRegions(
+            util.getConfiguration(), table, FAMILIES[0], newStartKeys);
         admin.enableTable(table.getTableName());
-        while (table.getRegionLocations().size() != 15
-            || !admin.isTableAvailable(table.getTableName())) {
+        while (table.getRegionLocations().size() != 15 ||
+            !admin.isTableAvailable(table.getTableName())) {
           Thread.sleep(200);
           LOG.info("Waiting for new region assignment to happen");
         }
@@ -453,8 +424,8 @@ public class TestHFileOutputFormat2  {
 
       // Ensure data shows up
       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
-      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
-        util.countRows(table));
+      assertEquals("LoadIncrementalHFiles should put expected data in table",
+          expectedRows, util.countRows(table));
       Scan scan = new Scan();
       ResultScanner results = table.getScanner(scan);
       for (Result res : results) {
@@ -468,17 +439,6 @@ public class TestHFileOutputFormat2  {
       results.close();
       String tableDigestBefore = util.checksumRows(table);
 
-      // Check region locality
-      HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
-      for (HRegion region : util.getHBaseCluster().getRegions(TABLE_NAME)) {
-        hbd.add(region.getHDFSBlocksDistribution());
-      }
-      for (String hostname : hostnames) {
-        float locality = hbd.getBlockLocalityIndex(hostname);
-        LOG.info("locality of [" + hostname + "]: " + locality);
-        assertEquals(100, (int) (locality * 100));
-      }
-
       // Cause regions to reopen
       admin.disableTable(TABLE_NAME);
       while (!admin.isTableDisabled(TABLE_NAME)) {
@@ -487,11 +447,9 @@ public class TestHFileOutputFormat2  {
       }
       admin.enableTable(TABLE_NAME);
       util.waitTableAvailable(TABLE_NAME.getName());
-      assertEquals("Data should remain after reopening of regions", tableDigestBefore,
-        util.checksumRows(table));
+      assertEquals("Data should remain after reopening of regions",
+          tableDigestBefore, util.checksumRows(table));
     } finally {
-      util.deleteTable(TABLE_NAME);
-      testDir.getFileSystem(conf).delete(testDir, true);
       if (admin != null) admin.close();
       util.shutdownMiniMapReduceCluster();
       util.shutdownMiniCluster();
@@ -820,11 +778,6 @@ public class TestHFileOutputFormat2  {
     Mockito.doReturn(mockKeys).when(table).getStartKeys();
   }
 
-  private void setupMockTableName(HTable table) throws IOException {
-    TableName mockTableName = TableName.valueOf("mock_table");
-    Mockito.doReturn(mockTableName).when(table).getName();
-  }
-
   /**
    * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
    * bloom filter settings from the column family descriptor
@@ -853,9 +806,6 @@ public class TestHFileOutputFormat2  {
       // pollutes the GZip codec pool with an incompatible compressor.
       conf.set("io.seqfile.compression.type", "NONE");
       conf.set("hbase.fs.tmp.dir", dir.toString());
-      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
-      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
-
       Job job = new Job(conf, "testLocalMRIncrementalLoad");
       job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
       setupRandomGeneratorMapper(job);
@@ -934,8 +884,7 @@ public class TestHFileOutputFormat2  {
    * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
    * will be thrown.
    */
-  @Ignore("Flakey: See HBASE-9051")
-  @Test
+  @Ignore ("Flakey: See HBASE-9051") @Test
   public void testExcludeAllFromMinorCompaction() throws Exception {
     Configuration conf = util.getConfiguration();
     conf.setInt("hbase.hstore.compaction.min", 2);