You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2015/06/15 21:26:00 UTC
[1/2] hbase git commit: HBASE-13833
LoadIncrementalHFile.doBulkLoad(Path,
HTable) doesn't handle unmanaged connections when using SecureBulkLoad
Repository: hbase
Updated Branches:
refs/heads/branch-1.1 c27bcd205 -> 7c6c916a4
HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections when using SecureBulkLoad
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7c6c916a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7c6c916a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7c6c916a
Branch: refs/heads/branch-1.1
Commit: 7c6c916a4c9e74499bcd26cd68b9c0dbf7679c8b
Parents: 521f6a9
Author: Nick Dimiduk <nd...@apache.org>
Authored: Sun Jun 14 15:44:49 2015 -0700
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Mon Jun 15 12:21:09 2015 -0700
----------------------------------------------------------------------
.../hbase/mapreduce/LoadIncrementalHFiles.java | 30 ++++++--
.../mapreduce/TestLoadIncrementalHFiles.java | 79 +++++++++++---------
2 files changed, 69 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c6c916a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 827699b..417deec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -291,15 +292,32 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws TableNotFoundException, IOException
{
Admin admin = null;
+ Table t = table;
+ Connection conn = table.getConnection();
+ boolean closeConnWhenFinished = false;
try {
- try {
- admin = table.getConnection().getAdmin();
- } catch (NeedUnmanagedConnectionException ex) {
- admin = new HBaseAdmin(table.getConfiguration());
+ if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
+ LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
+ // can only use unmanaged connections from here on out.
+ conn = ConnectionFactory.createConnection(table.getConfiguration());
+ t = conn.getTable(table.getName());
+ closeConnWhenFinished = true;
+ if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
+ throw new RuntimeException("Failed to create unmanaged connection.");
+ }
+ admin = conn.getAdmin();
+ } else {
+ admin = conn.getAdmin();
+ }
+ try (RegionLocator rl = conn.getRegionLocator(t.getName())) {
+ doBulkLoad(hfofDir, admin, t, rl);
}
- doBulkLoad(hfofDir, admin, table, table.getRegionLocator());
} finally {
- admin.close();
+ if (admin != null) admin.close();
+ if (closeConnWhenFinished) {
+ t.close();
+ conn.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7c6c916a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 0b4dc56..21f4e10 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
@@ -232,47 +234,56 @@ public class TestLoadIncrementalHFiles {
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
- Path dir = util.getDataTestDirOnTestFS(testName);
- FileSystem fs = util.getTestFileSystem();
- dir = dir.makeQualified(fs);
- Path familyDir = new Path(dir, Bytes.toString(FAMILY));
- int hfileIdx = 0;
- for (byte[][] range : hfileRanges) {
- byte[] from = range[0];
- byte[] to = range[1];
- HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
- + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
- }
- int expectedRows = hfileIdx * 1000;
+ for (boolean managed : new boolean[] { true, false }) {
+ Path dir = util.getDataTestDirOnTestFS(testName);
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
+ }
+ int expectedRows = hfileIdx * 1000;
- if (preCreateTable) {
- util.getHBaseAdmin().createTable(htd, tableSplitKeys);
- }
+ if (preCreateTable) {
+ util.getHBaseAdmin().createTable(htd, tableSplitKeys);
+ }
- final TableName tableName = htd.getTableName();
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
- String [] args= {dir.toString(), tableName.toString()};
- loader.run(args);
+ final TableName tableName = htd.getTableName();
+ if (!util.getHBaseAdmin().tableExists(tableName)) {
+ util.getHBaseAdmin().createTable(htd);
+ }
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
- Table table = new HTable(util.getConfiguration(), tableName);
- try {
- assertEquals(expectedRows, util.countRows(table));
- } finally {
- table.close();
- }
+ if (managed) {
+ try (HTable table = new HTable(util.getConfiguration(), tableName)) {
+ loader.doBulkLoad(dir, table);
+ assertEquals(expectedRows, util.countRows(table));
+ }
+ } else {
+ try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
+ HTable table = (HTable) conn.getTable(tableName)) {
+ loader.doBulkLoad(dir, table);
+ }
+ }
- // verify staging folder has been cleaned up
- Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
- if(fs.exists(stagingBasePath)) {
- FileStatus[] files = fs.listStatus(stagingBasePath);
- for(FileStatus file : files) {
- assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
- file.getPath().getName() != "DONOTERASE");
+ // verify staging folder has been cleaned up
+ Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
+ if (fs.exists(stagingBasePath)) {
+ FileStatus[] files = fs.listStatus(stagingBasePath);
+ for (FileStatus file : files) {
+ assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
+ file.getPath().getName() != "DONOTERASE");
+ }
}
- }
- util.deleteTable(tableName);
+ util.deleteTable(tableName);
+ }
}
/**
[2/2] hbase git commit: Revert "HBASE-13833
LoadIncrementalHFile.doBulkLoad(Path,
HTable) doesn't handle unmanaged connections when using SecureBulkLoad (Nick
Dimiduk)"
Posted by nd...@apache.org.
Revert "HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections when using SecureBulkLoad (Nick Dimiduk)"
This reverts commit 5e403cb3d92133b15faf955d16c6dbafed960c6f.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/521f6a97
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/521f6a97
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/521f6a97
Branch: refs/heads/branch-1.1
Commit: 521f6a9789e30e472f96810541caffeb5de9a06f
Parents: c27bcd2
Author: Nick Dimiduk <nd...@apache.org>
Authored: Sun Jun 14 15:08:57 2015 -0700
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Mon Jun 15 12:21:09 2015 -0700
----------------------------------------------------------------------
.../hbase/mapreduce/LoadIncrementalHFiles.java | 28 +++++---------------
1 file changed, 6 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/521f6a97/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index a8d13b2..827699b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -292,30 +291,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws TableNotFoundException, IOException
{
Admin admin = null;
- Table t = table;
- Connection conn = table.getConnection();
- boolean closeConnWhenFinished = false;
try {
- if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
- LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
- // can only use unmanaged connections from here on out.
- conn = ConnectionFactory.createConnection(table.getConfiguration());
- t = conn.getTable(table.getName());
- closeConnWhenFinished = true;
- if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
- throw new RuntimeException("Failed to create unmanaged connection.");
- }
- admin = conn.getAdmin();
- } else {
- admin = conn.getAdmin();
+ try {
+ admin = table.getConnection().getAdmin();
+ } catch (NeedUnmanagedConnectionException ex) {
+ admin = new HBaseAdmin(table.getConfiguration());
}
- doBulkLoad(hfofDir, admin, t, conn.getRegionLocator(t.getName()));
+ doBulkLoad(hfofDir, admin, table, table.getRegionLocator());
} finally {
- if (admin != null) admin.close();
- if (closeConnWhenFinished) {
- t.close();
- conn.close();
- }
+ admin.close();
}
}