You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2015/06/15 21:26:00 UTC

[1/2] hbase git commit: HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path, HTable) doesn't handle unmanaged connections when using SecureBulkLoad

Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 c27bcd205 -> 7c6c916a4


HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections when using SecureBulkLoad


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7c6c916a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7c6c916a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7c6c916a

Branch: refs/heads/branch-1.1
Commit: 7c6c916a4c9e74499bcd26cd68b9c0dbf7679c8b
Parents: 521f6a9
Author: Nick Dimiduk <nd...@apache.org>
Authored: Sun Jun 14 15:44:49 2015 -0700
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Mon Jun 15 12:21:09 2015 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 30 ++++++--
 .../mapreduce/TestLoadIncrementalHFiles.java    | 79 +++++++++++---------
 2 files changed, 69 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7c6c916a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 827699b..417deec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -291,15 +292,32 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     throws TableNotFoundException, IOException
   {
     Admin admin = null;
+    Table t = table;
+    Connection conn = table.getConnection();
+    boolean closeConnWhenFinished = false;
     try {
-      try {
-        admin = table.getConnection().getAdmin();
-      } catch (NeedUnmanagedConnectionException ex) {
-        admin = new HBaseAdmin(table.getConfiguration());
+      if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
+        LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
+        // can only use unmanaged connections from here on out.
+        conn = ConnectionFactory.createConnection(table.getConfiguration());
+        t = conn.getTable(table.getName());
+        closeConnWhenFinished = true;
+        if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
+          throw new RuntimeException("Failed to create unmanaged connection.");
+        }
+        admin = conn.getAdmin();
+      } else {
+        admin = conn.getAdmin();
+      }
+      try (RegionLocator rl = conn.getRegionLocator(t.getName())) {
+        doBulkLoad(hfofDir, admin, t, rl);
       }
-      doBulkLoad(hfofDir, admin, table, table.getRegionLocator());
     } finally {
-      admin.close();
+      if (admin != null) admin.close();
+      if (closeConnWhenFinished) {
+        t.close();
+        conn.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7c6c916a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 0b4dc56..21f4e10 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -232,47 +234,56 @@ public class TestLoadIncrementalHFiles {
 
   private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
       boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
-    Path dir = util.getDataTestDirOnTestFS(testName);
-    FileSystem fs = util.getTestFileSystem();
-    dir = dir.makeQualified(fs);
-    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
 
-    int hfileIdx = 0;
-    for (byte[][] range : hfileRanges) {
-      byte[] from = range[0];
-      byte[] to = range[1];
-      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
-          + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
-    }
-    int expectedRows = hfileIdx * 1000;
+    for (boolean managed : new boolean[] { true, false }) {
+      Path dir = util.getDataTestDirOnTestFS(testName);
+      FileSystem fs = util.getTestFileSystem();
+      dir = dir.makeQualified(fs);
+      Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+      int hfileIdx = 0;
+      for (byte[][] range : hfileRanges) {
+        byte[] from = range[0];
+        byte[] to = range[1];
+        HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+            + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
+      }
+      int expectedRows = hfileIdx * 1000;
 
-    if (preCreateTable) {
-      util.getHBaseAdmin().createTable(htd, tableSplitKeys);
-    }
+      if (preCreateTable) {
+        util.getHBaseAdmin().createTable(htd, tableSplitKeys);
+      }
 
-    final TableName tableName = htd.getTableName();
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
-    String [] args= {dir.toString(), tableName.toString()};
-    loader.run(args);
+      final TableName tableName = htd.getTableName();
+      if (!util.getHBaseAdmin().tableExists(tableName)) {
+        util.getHBaseAdmin().createTable(htd);
+      }
+      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
 
-    Table table = new HTable(util.getConfiguration(), tableName);
-    try {
-      assertEquals(expectedRows, util.countRows(table));
-    } finally {
-      table.close();
-    }
+      if (managed) {
+        try (HTable table = new HTable(util.getConfiguration(), tableName)) {
+          loader.doBulkLoad(dir, table);
+          assertEquals(expectedRows, util.countRows(table));
+        }
+      } else {
+        try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
+            HTable table = (HTable) conn.getTable(tableName)) {
+          loader.doBulkLoad(dir, table);
+        }
+      }
 
-    // verify staging folder has been cleaned up
-    Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
-    if(fs.exists(stagingBasePath)) {
-      FileStatus[] files = fs.listStatus(stagingBasePath);
-      for(FileStatus file : files) {
-        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
-          file.getPath().getName() != "DONOTERASE");
+      // verify staging folder has been cleaned up
+      Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
+      if (fs.exists(stagingBasePath)) {
+        FileStatus[] files = fs.listStatus(stagingBasePath);
+        for (FileStatus file : files) {
+          assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
+              file.getPath().getName() != "DONOTERASE");
+        }
       }
-    }
 
-    util.deleteTable(tableName);
+      util.deleteTable(tableName);
+    }
   }
 
   /**


[2/2] hbase git commit: Revert "HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path, HTable) doesn't handle unmanaged connections when using SecureBulkLoad (Nick Dimiduk)"

Posted by nd...@apache.org.
Revert "HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections when using SecureBulkLoad (Nick Dimiduk)"

This reverts commit 5e403cb3d92133b15faf955d16c6dbafed960c6f.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/521f6a97
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/521f6a97
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/521f6a97

Branch: refs/heads/branch-1.1
Commit: 521f6a9789e30e472f96810541caffeb5de9a06f
Parents: c27bcd2
Author: Nick Dimiduk <nd...@apache.org>
Authored: Sun Jun 14 15:08:57 2015 -0700
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Mon Jun 15 12:21:09 2015 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 28 +++++---------------
 1 file changed, 6 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/521f6a97/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index a8d13b2..827699b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -292,30 +291,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     throws TableNotFoundException, IOException
   {
     Admin admin = null;
-    Table t = table;
-    Connection conn = table.getConnection();
-    boolean closeConnWhenFinished = false;
     try {
-      if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
-        LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
-        // can only use unmanaged connections from here on out.
-        conn = ConnectionFactory.createConnection(table.getConfiguration());
-        t = conn.getTable(table.getName());
-        closeConnWhenFinished = true;
-        if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
-          throw new RuntimeException("Failed to create unmanaged connection.");
-        }
-        admin = conn.getAdmin();
-      } else {
-        admin = conn.getAdmin();
+      try {
+        admin = table.getConnection().getAdmin();
+      } catch (NeedUnmanagedConnectionException ex) {
+        admin = new HBaseAdmin(table.getConfiguration());
       }
-      doBulkLoad(hfofDir, admin, t, conn.getRegionLocator(t.getName()));
+      doBulkLoad(hfofDir, admin, table, table.getRegionLocator());
     } finally {
-      if (admin != null) admin.close();
-      if (closeConnWhenFinished) {
-        t.close();
-        conn.close();
-      }
+      admin.close();
     }
   }