You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/07/15 01:36:02 UTC
hbase git commit: Revert "HBASE-12596 bulkload needs to follow
locality (Victor Xu)"
Repository: hbase
Updated Branches:
refs/heads/0.98 94d5e7acc -> 3094ecb26
Revert "HBASE-12596 bulkload needs to follow locality (Victor Xu)"
This reverts commit bcb425f38f75359f662ea8cf49b3cd989998b780.
Java 6 and Hadoop 1 profile compilation problems
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3094ecb2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3094ecb2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3094ecb2
Branch: refs/heads/0.98
Commit: 3094ecb26611f93fe4bebe45c211d2eb3590f09b
Parents: 94d5e7a
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue Jul 14 16:34:35 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Jul 14 16:34:35 2015 -0700
----------------------------------------------------------------------
.../hbase/mapreduce/HFileOutputFormat2.java | 84 ++-------------
.../hbase/mapreduce/TestHFileOutputFormat.java | 9 --
.../hbase/mapreduce/TestHFileOutputFormat2.java | 103 +++++--------------
3 files changed, 33 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3094ecb2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index ae7e53d..91e5b22 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
@@ -41,13 +40,11 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -108,15 +105,6 @@ public class HFileOutputFormat2
public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
- /**
- * Keep locality while generating HFiles for bulkload. See HBASE-12596
- */
- public static final String LOCALITY_SENSITIVE_CONF_KEY =
- "hbase.bulkload.locality.sensitive.enabled";
- private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
- private static final String OUTPUT_TABLE_NAME_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.table.name";
-
@Override
public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
final TaskAttemptContext context) throws IOException, InterruptedException {
@@ -200,51 +188,7 @@ public class HFileOutputFormat2
// create a new HLog writer, if necessary
if (wl == null || wl.writer == null) {
- if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
- String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
- HRegionLocation loc = null;
- HTable htable = null;
- try {
- htable = new HTable(conf, tableName);
- loc = htable.getRegionLocation(rowKey);
- } catch (Throwable e) {
- LOG.warn("there's something wrong when locating rowkey: " +
- Bytes.toString(rowKey), e);
- loc = null;
- } finally {
- if(null != htable) {
- htable.close();
- }
- }
-
- if (null == loc) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("failed to get region location, so use default writer: " +
- Bytes.toString(rowKey));
- }
- wl = getNewWriter(family, conf, null);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
- }
- InetSocketAddress initialIsa =
- new InetSocketAddress(loc.getHostname(), loc.getPort());
- if (initialIsa.isUnresolved()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
- + loc.getPort() + ", so use default writer");
- }
- wl = getNewWriter(family, conf, null);
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
- }
- wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
- }
- }
- } else {
- wl = getNewWriter(family, conf, null);
- }
+ wl = getNewWriter(family, conf);
}
// we now have the proper HLog writer. full steam ahead
@@ -274,8 +218,8 @@ public class HFileOutputFormat2
* @return A WriterLength, containing a new StoreFile.Writer.
* @throws IOException
*/
- private WriterLength getNewWriter(byte[] family, Configuration conf,
- InetSocketAddress[] favoredNodes) throws IOException {
+ private WriterLength getNewWriter(byte[] family, Configuration conf)
+ throws IOException {
WriterLength wl = new WriterLength();
Path familydir = new Path(outputdir, Bytes.toString(family));
Algorithm compression = compressionMap.get(family);
@@ -297,18 +241,10 @@ public class HFileOutputFormat2
contextBuilder.withDataBlockEncoding(encoding);
HFileContext hFileContext = contextBuilder.build();
- if (null == favoredNodes) {
- wl.writer =
- new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
- .withOutputDir(familydir).withBloomType(bloomType)
- .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build();
- } else {
- wl.writer =
- new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
- .withOutputDir(familydir).withBloomType(bloomType)
- .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
- .withFavoredNodes(favoredNodes).build();
- }
+ wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
+ .withOutputDir(familydir).withBloomType(bloomType)
+ .withComparator(KeyValue.COMPARATOR)
+ .withFileContext(hFileContext).build();
this.writers.put(family, wl);
return wl;
@@ -448,12 +384,6 @@ public class HFileOutputFormat2
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
- if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
- // record this table name for creating writer by favored nodes
- LOG.info("bulkload locality sensitive enabled");
- conf.set(OUTPUT_TABLE_NAME_CONF_KEY, table.getName().getNameAsString());
- }
-
// Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3094ecb2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
index b8c0fc4..f3157f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
@@ -335,7 +335,6 @@ public class TestHFileOutputFormat {
job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
HTable table = Mockito.mock(HTable.class);
setupMockStartKeys(table);
- setupMockTableName(table);
HFileOutputFormat.configureIncrementalLoad(job, table);
assertEquals(job.getNumReduceTasks(), 4);
}
@@ -777,11 +776,6 @@ public class TestHFileOutputFormat {
Mockito.doReturn(mockKeys).when(table).getStartKeys();
}
- private void setupMockTableName(HTable table) throws IOException {
- TableName mockTableName = TableName.valueOf("mock_table");
- Mockito.doReturn(mockTableName).when(table).getName();
- }
-
/**
* Test that {@link HFileOutputFormat} RecordWriter uses compression and
* bloom filter settings from the column family descriptor
@@ -810,9 +804,6 @@ public class TestHFileOutputFormat {
// pollutes the GZip codec pool with an incompatible compressor.
conf.set("io.seqfile.compression.type", "NONE");
conf.set("hbase.fs.tmp.dir", dir.toString());
- // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
- conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
-
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3094ecb2/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index bff8494..28ca56f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HadoopShims;
import org.apache.hadoop.hbase.KeyValue;
@@ -68,7 +67,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Bytes;
@@ -338,7 +336,6 @@ public class TestHFileOutputFormat2 {
job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
HTable table = Mockito.mock(HTable.class);
setupMockStartKeys(table);
- setupMockTableName(table);
HFileOutputFormat2.configureIncrementalLoad(job, table);
assertEquals(job.getNumReduceTasks(), 4);
}
@@ -357,66 +354,39 @@ public class TestHFileOutputFormat2 {
@Test
public void testMRIncrementalLoad() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoad\n");
- doIncrementalLoadTest(false, false);
+ doIncrementalLoadTest(false);
}
@Test
public void testMRIncrementalLoadWithSplit() throws Exception {
LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
- doIncrementalLoadTest(true, false);
+ doIncrementalLoadTest(true);
}
- /**
- * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
- * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
- * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
- * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
- * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
- * locality features more easily.
- */
- @Test
- public void testMRIncrementalLoadWithLocality() throws Exception {
- LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
- doIncrementalLoadTest(false, true);
- doIncrementalLoadTest(true, true);
- }
-
- private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality)
- throws Exception {
+ private void doIncrementalLoadTest(
+ boolean shouldChangeRegions) throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
- conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
- int hostCount = 1;
- int regionNum = 5;
- if (shouldKeepLocality) {
- // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
- // explicit hostnames parameter just like MiniDFSCluster does.
- hostCount = 3;
- regionNum = 20;
- }
-
- byte[][] startKeys = generateRandomStartKeys(regionNum);
- String[] hostnames = new String[hostCount];
- for (int i = 0; i < hostCount; ++i) {
- hostnames[i] = "datanode_" + i;
- }
-
- Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
+ byte[][] startKeys = generateRandomStartKeys(5);
HBaseAdmin admin = null;
try {
- util.startMiniCluster(1, hostCount, hostnames);
+ util.startMiniCluster();
+ Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
admin = new HBaseAdmin(conf);
HTable table = util.createTable(TABLE_NAME, FAMILIES);
- assertEquals("Should start with empty table", 0, util.countRows(table));
- int numRegions =
- util.createMultiRegions(util.getConfiguration(), table, FAMILIES[0], startKeys);
- assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
+ assertEquals("Should start with empty table",
+ 0, util.countRows(table));
+ int numRegions = util.createMultiRegions(
+ util.getConfiguration(), table, FAMILIES[0], startKeys);
+ assertEquals("Should make 5 regions", numRegions, 5);
// Generate the bulk load files
util.startMiniMapReduceCluster();
runIncrementalPELoad(conf, table, testDir);
// This doesn't write into the table, just makes files
- assertEquals("HFOF should not touch actual table", 0, util.countRows(table));
+ assertEquals("HFOF should not touch actual table",
+ 0, util.countRows(table));
+
// Make sure that a directory was created for every CF
int dir = 0;
@@ -433,16 +403,17 @@ public class TestHFileOutputFormat2 {
if (shouldChangeRegions) {
LOG.info("Changing regions in table");
admin.disableTable(table.getTableName());
- while (util.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
- .isRegionsInTransition()) {
+ while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
+ getRegionStates().isRegionsInTransition()) {
Threads.sleep(200);
LOG.info("Waiting on table to finish disabling");
}
byte[][] newStartKeys = generateRandomStartKeys(15);
- util.createMultiRegions(util.getConfiguration(), table, FAMILIES[0], newStartKeys);
+ util.createMultiRegions(
+ util.getConfiguration(), table, FAMILIES[0], newStartKeys);
admin.enableTable(table.getTableName());
- while (table.getRegionLocations().size() != 15
- || !admin.isTableAvailable(table.getTableName())) {
+ while (table.getRegionLocations().size() != 15 ||
+ !admin.isTableAvailable(table.getTableName())) {
Thread.sleep(200);
LOG.info("Waiting for new region assignment to happen");
}
@@ -453,8 +424,8 @@ public class TestHFileOutputFormat2 {
// Ensure data shows up
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
- assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
- util.countRows(table));
+ assertEquals("LoadIncrementalHFiles should put expected data in table",
+ expectedRows, util.countRows(table));
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
for (Result res : results) {
@@ -468,17 +439,6 @@ public class TestHFileOutputFormat2 {
results.close();
String tableDigestBefore = util.checksumRows(table);
- // Check region locality
- HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
- for (HRegion region : util.getHBaseCluster().getRegions(TABLE_NAME)) {
- hbd.add(region.getHDFSBlocksDistribution());
- }
- for (String hostname : hostnames) {
- float locality = hbd.getBlockLocalityIndex(hostname);
- LOG.info("locality of [" + hostname + "]: " + locality);
- assertEquals(100, (int) (locality * 100));
- }
-
// Cause regions to reopen
admin.disableTable(TABLE_NAME);
while (!admin.isTableDisabled(TABLE_NAME)) {
@@ -487,11 +447,9 @@ public class TestHFileOutputFormat2 {
}
admin.enableTable(TABLE_NAME);
util.waitTableAvailable(TABLE_NAME.getName());
- assertEquals("Data should remain after reopening of regions", tableDigestBefore,
- util.checksumRows(table));
+ assertEquals("Data should remain after reopening of regions",
+ tableDigestBefore, util.checksumRows(table));
} finally {
- util.deleteTable(TABLE_NAME);
- testDir.getFileSystem(conf).delete(testDir, true);
if (admin != null) admin.close();
util.shutdownMiniMapReduceCluster();
util.shutdownMiniCluster();
@@ -820,11 +778,6 @@ public class TestHFileOutputFormat2 {
Mockito.doReturn(mockKeys).when(table).getStartKeys();
}
- private void setupMockTableName(HTable table) throws IOException {
- TableName mockTableName = TableName.valueOf("mock_table");
- Mockito.doReturn(mockTableName).when(table).getName();
- }
-
/**
* Test that {@link HFileOutputFormat2} RecordWriter uses compression and
* bloom filter settings from the column family descriptor
@@ -853,9 +806,6 @@ public class TestHFileOutputFormat2 {
// pollutes the GZip codec pool with an incompatible compressor.
conf.set("io.seqfile.compression.type", "NONE");
conf.set("hbase.fs.tmp.dir", dir.toString());
- // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
- conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
-
Job job = new Job(conf, "testLocalMRIncrementalLoad");
job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
setupRandomGeneratorMapper(job);
@@ -934,8 +884,7 @@ public class TestHFileOutputFormat2 {
* Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
* will be thrown.
*/
- @Ignore("Flakey: See HBASE-9051")
- @Test
+ @Ignore ("Flakey: See HBASE-9051") @Test
public void testExcludeAllFromMinorCompaction() throws Exception {
Configuration conf = util.getConfiguration();
conf.setInt("hbase.hstore.compaction.min", 2);