You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2014/02/12 18:13:07 UTC
svn commit: r1567688 - in
/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase:
mapreduce/LoadIncrementalHFiles.java util/HBaseFsck.java
Author: ndimiduk
Date: Wed Feb 12 17:13:07 2014
New Revision: 1567688
URL: http://svn.apache.org/r1567688
Log:
HBASE-10500 Some tools OOM when BucketCache is enabled
Modified:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1567688&r1=1567687&r2=1567688&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Feb 12 17:13:07 2014
@@ -100,7 +100,6 @@ public class LoadIncrementalHFiles exten
private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
static final AtomicLong regionCount = new AtomicLong(0);
private HBaseAdmin hbAdmin;
- private Configuration cfg;
public static final String NAME = "completebulkload";
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
@@ -113,7 +112,10 @@ public class LoadIncrementalHFiles exten
public LoadIncrementalHFiles(Configuration conf) throws Exception {
super(conf);
- this.cfg = conf;
+ // make a copy, just to be sure we're not overriding someone else's config
+ setConf(HBaseConfiguration.create(getConf()));
+ // disable blockcache for tool invocation, see HBASE-10500
+ getConf().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
this.hbAdmin = new HBaseAdmin(conf);
this.userProvider = UserProvider.instantiate(conf);
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
@@ -204,8 +206,8 @@ public class LoadIncrementalHFiles exten
}
// initialize thread pools
- int nrThreads = cfg.getInt("hbase.loadincremental.threads.max",
- Runtime.getRuntime().availableProcessors());
+ int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
+ Runtime.getRuntime().availableProcessors());
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("LoadIncrementalHFiles-%1$d");
ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
@@ -252,7 +254,7 @@ public class LoadIncrementalHFiles exten
//If using secure bulk load
//prepare staging directory and token
if (userProvider.isHBaseSecurityEnabled()) {
- FileSystem fs = FileSystem.get(cfg);
+ FileSystem fs = FileSystem.get(getConf());
//This condition is here for unit testing
//Since delegation token doesn't work in mini cluster
if (userProvider.isHadoopSecurityEnabled()) {
@@ -278,7 +280,7 @@ public class LoadIncrementalHFiles exten
+ count + " with " + queue.size() + " files remaining to group or split");
}
- int maxRetries = cfg.getInt("hbase.bulkload.retries.number", 0);
+ int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 0);
if (maxRetries != 0 && count >= maxRetries) {
LOG.error("Retry attempted " + count + " times without completing, bailing out");
return;
@@ -300,7 +302,7 @@ public class LoadIncrementalHFiles exten
if (userProvider.isHBaseSecurityEnabled()) {
if (userToken != null && !hasForwardedToken) {
try {
- userToken.cancel(cfg);
+ userToken.cancel(getConf());
} catch (Exception e) {
LOG.warn("Failed to cancel HDFS delegation token.", e);
}
@@ -579,7 +581,7 @@ public class LoadIncrementalHFiles exten
//from the staging directory back to original location
//in user directory
if(secureClient != null && !success) {
- FileSystem fs = FileSystem.get(cfg);
+ FileSystem fs = FileSystem.get(getConf());
for(Pair<byte[], String> el : famPaths) {
Path hfileStagingPath = null;
Path hfileOrigPath = new Path(el.getSecond());
@@ -811,21 +813,22 @@ public class LoadIncrementalHFiles exten
return -1;
}
- String dirPath = args[0];
+ String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]);
- boolean tableExists = this.doesTableExist(tableName);
+ boolean tableExists = this.doesTableExist(tableName);
if (!tableExists) this.createTable(tableName,dirPath);
Path hfofDir = new Path(dirPath);
- HTable table = new HTable(this.cfg, tableName);
+ HTable table = new HTable(getConf(), tableName);
doBulkLoad(hfofDir, table);
return 0;
}
public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
+ Configuration conf = HBaseConfiguration.create();
+ int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), args);
System.exit(ret);
}
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1567688&r1=1567687&r2=1567688&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Wed Feb 12 17:13:07 2014
@@ -269,6 +269,10 @@ public class HBaseFsck extends Configure
public HBaseFsck(Configuration conf) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException, ClassNotFoundException {
super(conf);
+ // make a copy, just to be sure we're not overriding someone else's config
+ setConf(HBaseConfiguration.create(getConf()));
+ // disable blockcache for tool invocation, see HBASE-10500
+ getConf().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
errors = getErrorReporter(conf);
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);