You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2015/09/22 07:03:44 UTC

[13/50] [abbrv] hive git commit: HIVE-11389 hbase import should allow partial imports and should work in parallel (gates)

HIVE-11389 hbase import should allow partial imports and should work in parallel (gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0fa45e4a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0fa45e4a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0fa45e4a

Branch: refs/heads/master
Commit: 0fa45e4a562fc2586b1ef06a88e9c186a0835316
Parents: 7e7f461
Author: Alan Gates <ga...@hortonworks.com>
Authored: Fri Jul 31 11:07:00 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Fri Jul 31 11:07:00 2015 -0700

----------------------------------------------------------------------
 .../hive/metastore/hbase/TestHBaseImport.java   | 557 +++++++++++++++++--
 .../hive/metastore/hbase/HBaseImport.java       | 435 +++++++++++++--
 2 files changed, 899 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0fa45e4a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java
index 7bdff18..1ac10f0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.ResourceType;
@@ -38,12 +41,16 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Test that import from an RDBMS based metastore works
@@ -52,6 +59,13 @@ public class TestHBaseImport extends HBaseIntegrationTests {
 
   private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName());
 
+  private static final String[] tableNames = new String[] {"allnonparttable", "allparttable"};
+  private static final String[] partVals = new String[] {"na", "emea", "latam", "apac"};
+  private static final String[] funcNames = new String[] {"allfunc1", "allfunc2"};
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   @BeforeClass
   public static void startup() throws Exception {
     HBaseIntegrationTests.startMiniCluster();
@@ -69,25 +83,396 @@ public class TestHBaseImport extends HBaseIntegrationTests {
   }
 
   @Test
-  public void doImport() throws Exception {
-    RawStore rdbms = new ObjectStore();
+  public void importAll() throws Exception {
+    RawStore rdbms;
+    rdbms = new ObjectStore();
+    rdbms.setConf(conf);
+
+    String[] dbNames = new String[] {"alldb1", "alldb2"};
+    String[] roles = new String[] {"allrole1", "allrole2"};
+    String[] tokenIds = new String[] {"alltokenid1", "alltokenid2"};
+    String[] tokens = new String[] {"alltoken1", "alltoken2"};
+    String[] masterKeys = new String[] {"allmk1", "allmk2"};
+    int now = (int)System.currentTimeMillis() / 1000;
+
+    setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+    int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+    int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+
+    HBaseImport importer = new HBaseImport("-a");
+    importer.setConnections(rdbms, store);
+    importer.run();
+
+    for (int i = 0; i < roles.length; i++) {
+      Role role = store.getRole(roles[i]);
+      Assert.assertNotNull(role);
+      Assert.assertEquals(roles[i], role.getRoleName());
+    }
+    // Make sure there aren't any extra roles
+    Assert.assertEquals(baseNumRoles + 2, store.listRoleNames().size());
+
+    for (int i = 0; i < dbNames.length; i++) {
+      Database db = store.getDatabase(dbNames[i]);
+      Assert.assertNotNull(db);
+      // check one random value in the db rather than every value
+      Assert.assertEquals("file:/tmp", db.getLocationUri());
+
+      Table table = store.getTable(db.getName(), tableNames[0]);
+      Assert.assertNotNull(table);
+      Assert.assertEquals(now, table.getLastAccessTime());
+      Assert.assertEquals("input", table.getSd().getInputFormat());
+
+      table = store.getTable(db.getName(), tableNames[1]);
+      Assert.assertNotNull(table);
+
+      for (int j = 0; j < partVals.length; j++) {
+        Partition part = store.getPartition(dbNames[i], tableNames[1], Arrays.asList(partVals[j]));
+        Assert.assertNotNull(part);
+        Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation());
+      }
+
+      Assert.assertEquals(4, store.getPartitions(dbNames[i], tableNames[1], -1).size());
+      Assert.assertEquals(2, store.getAllTables(dbNames[i]).size());
+
+      Assert.assertEquals(2, store.getFunctions(dbNames[i], "*").size());
+      for (int j = 0; j < funcNames.length; j++) {
+        Assert.assertNotNull(store.getFunction(dbNames[i], funcNames[j]));
+      }
+    }
+
+    Assert.assertEquals(baseNumDbs + 2, store.getAllDatabases().size());
+
+    // I can't test total number of tokens or master keys because the import grabs all and copies
+    // them, which means it grabs the ones imported by importSecurity test (if it's already run).
+    // Depending on it already running would make the tests order dependent, which junit doesn't
+    // guarantee.
+    for (int i = 0; i < tokenIds.length; i++) {
+      Assert.assertEquals(tokens[i], store.getToken(tokenIds[i]));
+    }
+    String[] hbaseKeys = store.getMasterKeys();
+    Set<String> keys = new HashSet<>(Arrays.asList(hbaseKeys));
+    for (int i = 0; i < masterKeys.length; i++) {
+      Assert.assertTrue(keys.contains(masterKeys[i]));
+    }
+  }
+
+  @Test
+  public void importOneDb() throws Exception {
+    RawStore rdbms;
+    rdbms = new ObjectStore();
+    rdbms.setConf(conf);
+
+    String[] dbNames = new String[] {"onedbdb1", "onedbdb2"};
+    String[] roles = new String[] {"onedbrole1", "onedbrole2"};
+    String[] tokenIds = new String[] {"onedbtokenid1", "onedbtokenid2"};
+    String[] tokens = new String[] {"onedbtoken1", "onedbtoken2"};
+    String[] masterKeys = new String[] {"onedbmk1", "onedbmk2"};
+    int now = (int)System.currentTimeMillis() / 1000;
+
+    setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+    int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+    int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+    int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+        store.getAllTokenIdentifiers().size();
+    int baseNumKeys =  store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+    HBaseImport importer = new HBaseImport("-d", dbNames[0]);
+    importer.setConnections(rdbms, store);
+    importer.run();
+
+    // Make sure there aren't any extra roles
+    Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+    Database db = store.getDatabase(dbNames[0]);
+    Assert.assertNotNull(db);
+    // check one random value in the db rather than every value
+    Assert.assertEquals("file:/tmp", db.getLocationUri());
+
+    Table table = store.getTable(db.getName(), tableNames[0]);
+    Assert.assertNotNull(table);
+    Assert.assertEquals(now, table.getLastAccessTime());
+    Assert.assertEquals("input", table.getSd().getInputFormat());
+
+    table = store.getTable(db.getName(), tableNames[1]);
+    Assert.assertNotNull(table);
+
+    for (int j = 0; j < partVals.length; j++) {
+      Partition part = store.getPartition(dbNames[0], tableNames[1], Arrays.asList(partVals[j]));
+      Assert.assertNotNull(part);
+      Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation());
+    }
+
+    Assert.assertEquals(4, store.getPartitions(dbNames[0], tableNames[1], -1).size());
+    Assert.assertEquals(2, store.getAllTables(dbNames[0]).size());
+
+    Assert.assertEquals(2, store.getFunctions(dbNames[0], "*").size());
+    for (int j = 0; j < funcNames.length; j++) {
+      Assert.assertNotNull(store.getFunction(dbNames[0], funcNames[j]));
+    }
+
+    Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size());
+
+    Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+    String[] hbaseKeys = store.getMasterKeys();
+    Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+
+    // Have to do this last as it will throw an exception
+    thrown.expect(NoSuchObjectException.class);
+    store.getDatabase(dbNames[1]);
+  }
+
+  @Test
+  public void importOneFunc() throws Exception {
+    RawStore rdbms;
+    rdbms = new ObjectStore();
+    rdbms.setConf(conf);
+
+    String[] dbNames = new String[] {"onefuncdb1", "onefuncdb2"};
+    String[] roles = new String[] {"onefuncrole1", "onefuncrole2"};
+    String[] tokenIds = new String[] {"onefunctokenid1", "onefunctokenid2"};
+    String[] tokens = new String[] {"onefunctoken1", "onefunctoken2"};
+    String[] masterKeys = new String[] {"onefuncmk1", "onefuncmk2"};
+    int now = (int)System.currentTimeMillis() / 1000;
+
+    setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+    int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+    int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+    int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+        store.getAllTokenIdentifiers().size();
+    int baseNumKeys =  store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+    // Create the database so I can put the function in it.
+    store.createDatabase(
+        new Database(dbNames[0], "no description", "file:/tmp", emptyParameters));
+
+    HBaseImport importer = new HBaseImport("-f", dbNames[0] + "." + funcNames[0]);
+    importer.setConnections(rdbms, store);
+    importer.run();
+
+    // Make sure there aren't any extra roles
+    Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+    Database db = store.getDatabase(dbNames[0]);
+    Assert.assertNotNull(db);
+
+    Assert.assertEquals(0, store.getAllTables(dbNames[0]).size());
+    Assert.assertEquals(1, store.getFunctions(dbNames[0], "*").size());
+    Assert.assertNotNull(store.getFunction(dbNames[0], funcNames[0]));
+    Assert.assertNull(store.getFunction(dbNames[0], funcNames[1]));
+
+    Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size());
+
+    Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+    String[] hbaseKeys = store.getMasterKeys();
+    Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+  }
+
+  @Test
+  public void importOneTableNonPartitioned() throws Exception {
+    RawStore rdbms;
+    rdbms = new ObjectStore();
+    rdbms.setConf(conf);
+
+    String[] dbNames = new String[] {"onetabdb1", "onetabdb2"};
+    String[] roles = new String[] {"onetabrole1", "onetabrole2"};
+    String[] tokenIds = new String[] {"onetabtokenid1", "onetabtokenid2"};
+    String[] tokens = new String[] {"onetabtoken1", "onetabtoken2"};
+    String[] masterKeys = new String[] {"onetabmk1", "onetabmk2"};
+    int now = (int)System.currentTimeMillis() / 1000;
+
+    setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+    int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+    int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+    int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+        store.getAllTokenIdentifiers().size();
+    int baseNumKeys =  store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+    // Create the database so I can put the table in it.
+    store.createDatabase(
+        new Database(dbNames[0], "no description", "file:/tmp", emptyParameters));
+
+    HBaseImport importer = new HBaseImport("-t", dbNames[0] + "." + tableNames[0]);
+    importer.setConnections(rdbms, store);
+    importer.run();
+
+    // Make sure there aren't any extra roles
+    Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+    Database db = store.getDatabase(dbNames[0]);
+    Assert.assertNotNull(db);
+
+    Table table = store.getTable(db.getName(), tableNames[0]);
+    Assert.assertNotNull(table);
+    Assert.assertEquals(1, store.getAllTables(db.getName()).size());
+    Assert.assertNull(store.getTable(db.getName(), tableNames[1]));
+
+    Assert.assertEquals(0, store.getFunctions(dbNames[0], "*").size());
+    Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size());
+
+    Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+    String[] hbaseKeys = store.getMasterKeys();
+    Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+
+  }
+
+  @Test
+  public void importOneTablePartitioned() throws Exception {
+    RawStore rdbms;
+    rdbms = new ObjectStore();
+    rdbms.setConf(conf);
+
+    String[] dbNames = new String[] {"onetabpartdb1", "onetabpartodb2"};
+    String[] roles = new String[] {"onetabpartorole1", "onetabpartorole2"};
+    String[] tokenIds = new String[] {"onetabpartotokenid1", "onetabpartotokenid2"};
+    String[] tokens = new String[] {"onetabpartotoken1", "onetabpartotoken2"};
+    String[] masterKeys = new String[] {"onetabpartomk1", "onetabpartomk2"};
+    int now = (int)System.currentTimeMillis() / 1000;
+
+    setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+    int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+    int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+    int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+        store.getAllTokenIdentifiers().size();
+    int baseNumKeys =  store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+    // Create the database so I can put the table in it.
+    store.createDatabase(
+        new Database(dbNames[0], "no description", "file:/tmp", emptyParameters));
+
+    HBaseImport importer = new HBaseImport("-t", dbNames[0] + "." + tableNames[1]);
+    importer.setConnections(rdbms, store);
+    importer.run();
+
+    // Make sure there aren't any extra roles
+    Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+    Database db = store.getDatabase(dbNames[0]);
+    Assert.assertNotNull(db);
+
+    Table table = store.getTable(db.getName(), tableNames[1]);
+    Assert.assertNotNull(table);
+    Assert.assertEquals(1, store.getAllTables(db.getName()).size());
+
+    for (int j = 0; j < partVals.length; j++) {
+      Partition part = store.getPartition(dbNames[0], tableNames[1], Arrays.asList(partVals[j]));
+      Assert.assertNotNull(part);
+      Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation());
+    }
+    Assert.assertEquals(4, store.getPartitions(dbNames[0], tableNames[1], -1).size());
+
+    Assert.assertNull(store.getTable(db.getName(), tableNames[0]));
+
+    Assert.assertEquals(0, store.getFunctions(dbNames[0], "*").size());
+    Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size());
+
+    Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+    String[] hbaseKeys = store.getMasterKeys();
+    Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+  }
+
+  @Test
+  public void importSecurity() throws Exception {
+    RawStore rdbms;
+    rdbms = new ObjectStore();
+    rdbms.setConf(conf);
+
+    String[] dbNames = new String[] {"securitydb1", "securitydb2"};
+    String[] roles = new String[] {"securityrole1", "securityrole2"};
+    String[] tokenIds = new String[] {"securitytokenid1", "securitytokenid2"};
+    String[] tokens = new String[] {"securitytoken1", "securitytoken2"};
+    String[] masterKeys = new String[] {"securitymk1", "securitymk2"};
+    int now = (int)System.currentTimeMillis() / 1000;
+
+    setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+    int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+    int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+
+    HBaseImport importer = new HBaseImport("-k");
+    importer.setConnections(rdbms, store);
+    importer.run();
+
+    Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+    Assert.assertEquals(baseNumDbs, store.getAllDatabases().size());
+
+    // I can't test total number of tokens or master keys because the import grabs all and copies
+    // them, which means it grabs the ones imported by importAll test (if it's already run).
+    // Depending on it already running would make the tests order dependent, which junit doesn't
+    // guarantee.
+    for (int i = 0; i < tokenIds.length; i++) {
+      Assert.assertEquals(tokens[i], store.getToken(tokenIds[i]));
+    }
+    String[] hbaseKeys = store.getMasterKeys();
+    Set<String> keys = new HashSet<>(Arrays.asList(hbaseKeys));
+    for (int i = 0; i < masterKeys.length; i++) {
+      Assert.assertTrue(keys.contains(masterKeys[i]));
+    }
+  }
+
+  // TODO test for bogus function name
+  // TODO test for bogus table name
+  // TODO test for non-existent items
+
+  @Test
+  public void importOneRole() throws Exception {
+    RawStore rdbms;
+    rdbms = new ObjectStore();
     rdbms.setConf(conf);
 
-    String[] dbNames = new String[] {"importdb1", "importdb2"};
-    String[] tableNames = new String[] {"nonparttable", "parttable"};
-    String[] partVals = new String[] {"na", "emea", "latam", "apac"};
-    String[] funcNames = new String[] {"func1", "func2"};
-    String[] roles = new String[] {"role1", "role2"};
+    String[] dbNames = new String[] {"oneroledb1", "oneroledb2"};
+    String[] roles = new String[] {"onerolerole1", "onerolerole2"};
+    String[] tokenIds = new String[] {"oneroletokenid1", "oneroletokenid2"};
+    String[] tokens = new String[] {"oneroletoken1", "oneroletoken2"};
+    String[] masterKeys = new String[] {"onerolemk1", "onerolemk2"};
     int now = (int)System.currentTimeMillis() / 1000;
 
+    setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+    int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+    int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+    int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+        store.getAllTokenIdentifiers().size();
+    int baseNumKeys =  store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+    HBaseImport importer = new HBaseImport("-r", roles[0]);
+    importer.setConnections(rdbms, store);
+    importer.run();
+
+    Role role = store.getRole(roles[0]);
+    Assert.assertNotNull(role);
+    Assert.assertEquals(roles[0], role.getRoleName());
+
+    // Make sure there aren't any extra roles
+    Assert.assertEquals(baseNumRoles + 1, store.listRoleNames().size());
+    Assert.assertEquals(baseNumDbs, store.getAllDatabases().size());
+
+    Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+    String[] hbaseKeys = store.getMasterKeys();
+    Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+
+    // Have to do this last as it will throw an exception
+    thrown.expect(NoSuchObjectException.class);
+    store.getRole(roles[1]);
+  }
+
+  private void setupObjectStore(RawStore rdbms, String[] roles, String[] dbNames,
+                                String[] tokenIds, String[] tokens, String[] masterKeys, int now)
+      throws MetaException, InvalidObjectException, NoSuchObjectException {
     for (int i = 0; i < roles.length; i++) {
       rdbms.addRole(roles[i], "me");
     }
 
     for (int i = 0; i < dbNames.length; i++) {
-      rdbms.createDatabase(new Database(dbNames[i], "no description", "file:/tmp", emptyParameters));
+      rdbms.createDatabase(
+          new Database(dbNames[i], "no description", "file:/tmp", emptyParameters));
 
-      List<FieldSchema> cols = new ArrayList<FieldSchema>();
+      List<FieldSchema> cols = new ArrayList<>();
       cols.add(new FieldSchema("col1", "int", "nocomment"));
       SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
       StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
@@ -95,7 +480,7 @@ public class TestHBaseImport extends HBaseIntegrationTests {
       rdbms.createTable(new Table(tableNames[0], dbNames[i], "me", now, now, 0, sd, null,
           emptyParameters, null, null, null));
 
-      List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+      List<FieldSchema> partCols = new ArrayList<>();
       partCols.add(new FieldSchema("region", "string", ""));
       rdbms.createTable(new Table(tableNames[1], dbNames[i], "me", now, now, 0, sd, partCols,
           emptyParameters, null, null, null));
@@ -105,57 +490,149 @@ public class TestHBaseImport extends HBaseIntegrationTests {
         psd.setLocation("file:/tmp/region=" + partVals[j]);
         Partition part = new Partition(Arrays.asList(partVals[j]), dbNames[i], tableNames[1],
             now, now, psd, emptyParameters);
-        store.addPartition(part);
+        rdbms.addPartition(part);
       }
 
       for (String funcName : funcNames) {
-        store.createFunction(new Function(funcName, dbNames[i], "classname", "ownername",
-            PrincipalType.USER, (int)System.currentTimeMillis()/1000, FunctionType.JAVA,
+        LOG.debug("Creating new function " + dbNames[i] + "." + funcName);
+        rdbms.createFunction(new Function(funcName, dbNames[i], "classname", "ownername",
+            PrincipalType.USER, (int) System.currentTimeMillis() / 1000, FunctionType.JAVA,
             Arrays.asList(new ResourceUri(ResourceType.JAR, "uri"))));
       }
     }
+    for (int i = 0; i < tokenIds.length; i++) rdbms.addToken(tokenIds[i], tokens[i]);
+    for (int i = 0; i < masterKeys.length; i++) {
+      rdbms.addMasterKey(masterKeys[i]);
+    }
+  }
 
-    HBaseImport importer = new HBaseImport();
-    importer.setConnections(rdbms, store);
-    importer.run();
+  @Test
+  public void parallel() throws Exception {
+    int parallelFactor = 10;
+    RawStore rdbms;
+    rdbms = new ObjectStore();
+    rdbms.setConf(conf);
 
-    for (int i = 0; i < roles.length; i++) {
-      Role role = store.getRole(roles[i]);
-      Assert.assertNotNull(role);
-      Assert.assertEquals(roles[i], role.getRoleName());
+    String[] dbNames = new String[] {"paralleldb1"};
+    int now = (int)System.currentTimeMillis() / 1000;
+
+    for (int i = 0; i < dbNames.length; i++) {
+      rdbms.createDatabase(
+          new Database(dbNames[i], "no description", "file:/tmp", emptyParameters));
+
+      List<FieldSchema> cols = new ArrayList<>();
+      cols.add(new FieldSchema("col1", "int", "nocomment"));
+      SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+      StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+          serde, null, null, emptyParameters);
+
+      List<FieldSchema> partCols = new ArrayList<>();
+      partCols.add(new FieldSchema("region", "string", ""));
+      for (int j = 0; j < parallelFactor; j++) {
+        rdbms.createTable(new Table("t" + j, dbNames[i], "me", now, now, 0, sd, partCols,
+            emptyParameters, null, null, null));
+        for (int k = 0; k < parallelFactor; k++) {
+          StorageDescriptor psd = new StorageDescriptor(sd);
+          psd.setLocation("file:/tmp/region=" + k);
+          Partition part = new Partition(Arrays.asList("p" + k), dbNames[i], "t" + j,
+              now, now, psd, emptyParameters);
+          rdbms.addPartition(part);
+        }
+      }
     }
-    // Make sure there aren't any extra roles
-    Assert.assertEquals(2, store.listRoleNames().size());
+
+    HBaseImport importer = new HBaseImport("-p", "2", "-b", "2", "-d", dbNames[0]);
+    importer.setConnections(rdbms, store);
+    importer.run();
 
     for (int i = 0; i < dbNames.length; i++) {
       Database db = store.getDatabase(dbNames[i]);
       Assert.assertNotNull(db);
-      // check one random value in the db rather than every value
-      Assert.assertEquals("file:/tmp", db.getLocationUri());
 
-      Table table = store.getTable(db.getName(), tableNames[0]);
-      Assert.assertNotNull(table);
-      Assert.assertEquals(now, table.getLastAccessTime());
-      Assert.assertEquals("input", table.getSd().getInputFormat());
+      for (int j = 0; j < parallelFactor; j++) {
+        Table table = store.getTable(db.getName(), "t" + j);
+        Assert.assertNotNull(table);
+        Assert.assertEquals(now, table.getLastAccessTime());
+        Assert.assertEquals("input", table.getSd().getInputFormat());
 
-      table = store.getTable(db.getName(), tableNames[1]);
-      Assert.assertNotNull(table);
+        for (int k = 0; k < parallelFactor; k++) {
+          Partition part =
+              store.getPartition(dbNames[i], "t" + j, Arrays.asList("p" + k));
+          Assert.assertNotNull(part);
+          Assert.assertEquals("file:/tmp/region=" + k, part.getSd().getLocation());
+        }
 
-      for (int j = 0; j < partVals.length; j++) {
-        Partition part = store.getPartition(dbNames[i], tableNames[1], Arrays.asList(partVals[j]));
-        Assert.assertNotNull(part);
-        Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation());
+        Assert.assertEquals(parallelFactor, store.getPartitions(dbNames[i], "t" + j, -1).size());
       }
+      Assert.assertEquals(parallelFactor, store.getAllTables(dbNames[i]).size());
 
-      Assert.assertEquals(4, store.getPartitions(dbNames[i], tableNames[1], -1).size());
-      Assert.assertEquals(2, store.getAllTables(dbNames[i]).size());
+    }
+  }
 
-      Assert.assertEquals(2, store.getFunctions(dbNames[i], "*").size());
-      for (int j = 0; j < funcNames.length; j++) {
-        Assert.assertNotNull(store.getFunction(dbNames[i], funcNames[j]));
+  // Same as the test above except we create 9 of everything instead of 10.  This is important
+  // because in using a batch size of 2 the previous test guarantees 10 /2 =5 , meaning we'll
+  // have 5 writes on the partition queue with exactly 2 entries.  In this test we'll handle the
+  // case where the last entry in the queue has fewer partitions.
+  @Test
+  public void parallelOdd() throws Exception {
+    int parallelFactor = 9;
+    RawStore rdbms;
+    rdbms = new ObjectStore();
+    rdbms.setConf(conf);
+
+    String[] dbNames = new String[] {"oddparalleldb1"};
+    int now = (int)System.currentTimeMillis() / 1000;
+
+    for (int i = 0; i < dbNames.length; i++) {
+      rdbms.createDatabase(
+          new Database(dbNames[i], "no description", "file:/tmp", emptyParameters));
+
+      List<FieldSchema> cols = new ArrayList<>();
+      cols.add(new FieldSchema("col1", "int", "nocomment"));
+      SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+      StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+          serde, null, null, emptyParameters);
+
+      List<FieldSchema> partCols = new ArrayList<>();
+      partCols.add(new FieldSchema("region", "string", ""));
+      for (int j = 0; j < parallelFactor; j++) {
+        rdbms.createTable(new Table("t" + j, dbNames[i], "me", now, now, 0, sd, partCols,
+            emptyParameters, null, null, null));
+        for (int k = 0; k < parallelFactor; k++) {
+          StorageDescriptor psd = new StorageDescriptor(sd);
+          psd.setLocation("file:/tmp/region=" + k);
+          Partition part = new Partition(Arrays.asList("p" + k), dbNames[i], "t" + j,
+              now, now, psd, emptyParameters);
+          rdbms.addPartition(part);
+        }
       }
     }
 
-    Assert.assertEquals(2, store.getAllDatabases().size());
+    HBaseImport importer = new HBaseImport("-p", "2", "-b", "2", "-d", dbNames[0]);
+    importer.setConnections(rdbms, store);
+    importer.run();
+
+    for (int i = 0; i < dbNames.length; i++) {
+      Database db = store.getDatabase(dbNames[i]);
+      Assert.assertNotNull(db);
+
+      for (int j = 0; j < parallelFactor; j++) {
+        Table table = store.getTable(db.getName(), "t" + j);
+        Assert.assertNotNull(table);
+        Assert.assertEquals(now, table.getLastAccessTime());
+        Assert.assertEquals("input", table.getSd().getInputFormat());
+
+        for (int k = 0; k < parallelFactor; k++) {
+          Partition part =
+              store.getPartition(dbNames[i], "t" + j, Arrays.asList("p" + k));
+          Assert.assertNotNull(part);
+          Assert.assertEquals("file:/tmp/region=" + k, part.getSd().getLocation());
+        }
+
+        Assert.assertEquals(parallelFactor, store.getPartitions(dbNames[i], "t" + j, -1).size());
+      }
+      Assert.assertEquals(parallelFactor, store.getAllTables(dbNames[i]).size());
+
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0fa45e4a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
index e416b8a..e143de7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
@@ -19,11 +19,18 @@
 package org.apache.hadoop.hive.metastore.hbase;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Deadline;
 import org.apache.hadoop.hive.metastore.ObjectStore;
 import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -36,23 +43,41 @@ import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.Table;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A tool to take the contents of an RDBMS based Hive metastore and import it into an HBase based
  * one.  To use this the config files for Hive configured to work with the RDBMS (that is,
- * including the JDBC string, etc.) and for HBase must be in the path.  This tool will then
+ * including the JDBC string, etc.) as well as HBase configuration files must be in the path.
+ * There should not be a hive-site.xml that specifies HBaseStore in the path.  This tool will then
  * handle connecting to the RDBMS via the {@link org.apache.hadoop.hive.metastore.ObjectStore}
  * and HBase via {@link org.apache.hadoop.hive.metastore.hbase.HBaseStore} and transferring the
  * data.
+ *
+ * This tool can import an entire metastore or only selected objects.  When selecting objects it
+ * is necessary to fully specify the object's name.  For example, if you want to import the table
+ * T in the default database it needs to be identified as default.T.  The same is true for
+ * functions.  When an object is specified, everything under that object will be imported (e.g.
+ * if you select database D, then all tables and functions in that database will be
+ * imported as well).
+ *
+ * At this point only tables and partitions are handled in parallel as it is assumed there are
+ * relatively few of everything else.
+ *
+ * Note that HBaseSchemaTool must have already been used to create the appropriate tables in HBase.
  */
 public class HBaseImport {
 
   static final private Log LOG = LogFactory.getLog(HBaseImport.class.getName());
 
   public static void main(String[] args) {
-    HBaseImport tool = new HBaseImport();
     try {
+      HBaseImport tool = new HBaseImport(args);
       tool.run();
     } catch (Exception e) {
       System.err.println("Caught exception " + e.getClass().getName() + " with message <" +
@@ -60,112 +85,399 @@ public class HBaseImport {
     }
   }
 
+  private ThreadLocal<RawStore> rdbmsStore = new ThreadLocal<RawStore>() {
+    @Override
+    protected RawStore initialValue() {
+      if (rdbmsConf == null) {
+        throw new RuntimeException("order violation, need to set rdbms conf first");
+      }
+      RawStore os = new ObjectStore();
+      os.setConf(rdbmsConf);
+      return os;
+    }
+  };
+
+  private ThreadLocal<RawStore> hbaseStore = new ThreadLocal<RawStore>() {
+    @Override
+    protected RawStore initialValue() {
+      if (hbaseConf == null) {
+        throw new RuntimeException("order violation, need to set hbase conf first");
+      }
+      RawStore hs = new HBaseStore();
+      hs.setConf(hbaseConf);
+      return hs;
+    }
+  };
+
   private Configuration rdbmsConf;
   private Configuration hbaseConf;
-  private RawStore rdbmsStore;
-  private RawStore hbaseStore;
   private List<Database> dbs;
-  private List<Table> tables;
+  private BlockingQueue<Table> partitionedTables;
+  private BlockingQueue<String[]> tableNameQueue;
+  private BlockingQueue<PartQueueEntry> partQueue;
+  private boolean writingToQueue, readersFinished;
+  private boolean doKerberos, doAll;
+  private List<String> rolesToImport, dbsToImport, tablesToImport, functionsToImport;
+  private int parallel;
+  private int batchSize;
 
   @VisibleForTesting
-  HBaseImport() {
-    dbs = new ArrayList<Database>();
-    tables = new ArrayList<Table>();
+  HBaseImport(String... args) throws ParseException {
+    Options options = new Options();
+
+    doAll = doKerberos = false;
+    parallel = 1;
+    batchSize = 1000;
+
+    options.addOption(OptionBuilder
+        .withLongOpt("all")
+        .withDescription("Import the full metastore")
+        .create('a'));
+
+    options.addOption(OptionBuilder
+        .withLongOpt("batchsize")
+        .withDescription("Number of partitions to read and write in a batch, defaults to 1000")
+            .hasArg()
+            .create('b'));
+
+    options.addOption(OptionBuilder
+        .withLongOpt("database")
+        .withDescription("Import a single database")
+        .hasArgs()
+        .create('d'));
+
+    options.addOption(OptionBuilder
+        .withLongOpt("help")
+        .withDescription("You're looking at it")
+        .create('h'));
+
+    options.addOption(OptionBuilder
+        .withLongOpt("function")
+        .withDescription("Import a single function")
+        .hasArgs()
+        .create('f'));
+
+    options.addOption(OptionBuilder
+        .withLongOpt("kerberos")
+        .withDescription("Import all kerberos related objects (master key, tokens)")
+        .create('k'));
+
+     options.addOption(OptionBuilder
+        .withLongOpt("parallel")
+        .withDescription("Parallel factor for loading (only applied to tables and partitions), " +
+            "defaults to 1")
+        .hasArg()
+        .create('p'));
+
+    options.addOption(OptionBuilder
+        .withLongOpt("role")
+        .withDescription("Import a single role")
+        .hasArgs()
+        .create('r'));
 
+   options.addOption(OptionBuilder
+        .withLongOpt("tables")
+        .withDescription("Import a single tables")
+        .hasArgs()
+        .create('t'));
+
+    CommandLine cli = new GnuParser().parse(options, args);
+
+    // Process help, if it was asked for, this must be done first
+    if (cli.hasOption('h')) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("hbaseschematool", options);
+      // returning here results in nothing else happening, because none of the other flags have
+      // been set.
+      return;
+    }
+
+    // Now process the other command line args
+    if (cli.hasOption('a')) {
+      doAll = true;
+    }
+    if (cli.hasOption('b')) {
+      batchSize = Integer.valueOf(cli.getOptionValue('b'));
+    }
+    if (cli.hasOption('d')) {
+      dbsToImport = Arrays.asList(cli.getOptionValues('d'));
+    }
+    if (cli.hasOption('f')) {
+      functionsToImport = Arrays.asList(cli.getOptionValues('f'));
+    }
+    if (cli.hasOption('p')) {
+      parallel = Integer.valueOf(cli.getOptionValue('p'));
+    }
+    if (cli.hasOption('r')) {
+      rolesToImport = Arrays.asList(cli.getOptionValues('r'));
+    }
+    if (cli.hasOption('k')) {
+      doKerberos = true;
+    }
+    if (cli.hasOption('t')) {
+      tablesToImport = Arrays.asList(cli.getOptionValues('t'));
+    }
+
+    dbs = new ArrayList<>();
+    // We don't want to bound the size of the table queue because we keep it all in memory
+    partitionedTables = new LinkedBlockingQueue<>();
+    tableNameQueue = new LinkedBlockingQueue<>();
+
+    // Bound the size of this queue so we don't get too much in memory.
+    partQueue = new ArrayBlockingQueue<>(parallel * 2);
   }
 
   @VisibleForTesting
   void run() throws MetaException, InstantiationException, IllegalAccessException,
-      NoSuchObjectException, InvalidObjectException {
+      NoSuchObjectException, InvalidObjectException, InterruptedException {
     // Order here is crucial, as you can't add tables until you've added databases, etc.
     init();
-    copyRoles();
-    copyDbs();
-    copyTables();
-    copyPartitions();
-    copyFunctions();
+    if (doAll || rolesToImport != null) {
+      copyRoles();
+    }
+    if (doAll || dbsToImport != null) {
+      copyDbs();
+    }
+    if (doAll || dbsToImport != null || tablesToImport != null) {
+      copyTables();
+      copyPartitions();
+    }
+    if (doAll || dbsToImport != null || functionsToImport != null) {
+      copyFunctions();
+    }
+    if (doAll || doKerberos) {
+      copyKerberos();
+    }
   }
 
   private void init() throws MetaException, IllegalAccessException, InstantiationException {
-    if (rdbmsStore != null) {
+    if (rdbmsConf != null) {
       // We've been configured for testing, so don't do anything here.
       return;
     }
-    rdbmsConf = new HiveConf();  // We're depending on having everything properly in the path
-    hbaseConf = new HiveConf();
+    // We're depending on having everything properly in the path
+    rdbmsConf = new HiveConf();
+    hbaseConf = new HiveConf();//
     HiveConf.setVar(hbaseConf, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
         HBaseStore.class.getName());
     HiveConf.setBoolVar(hbaseConf, HiveConf.ConfVars.METASTORE_FASTPATH, true);
 
     // First get a connection to the RDBMS based store
-    rdbmsStore = new ObjectStore();
-    rdbmsStore.setConf(rdbmsConf);
+    rdbmsStore.get().setConf(rdbmsConf);
 
     // Get a connection to the HBase based store
-    hbaseStore = new HBaseStore();
-    hbaseStore.setConf(hbaseConf);
-    // This will go create the tables if they don't exist
-    hbaseStore.verifySchema();
+    hbaseStore.get().setConf(hbaseConf);
   }
 
   private void copyRoles() throws NoSuchObjectException, InvalidObjectException, MetaException {
     screen("Copying roles");
-    for (String roleName : rdbmsStore.listRoleNames()) {
-      Role role = rdbmsStore.getRole(roleName);
+    List<String> toCopy = doAll ? rdbmsStore.get().listRoleNames() : rolesToImport;
+    for (String roleName : toCopy) {
+      Role role = rdbmsStore.get().getRole(roleName);
       screen("Copying role " + roleName);
-      hbaseStore.addRole(roleName, role.getOwnerName());
+      hbaseStore.get().addRole(roleName, role.getOwnerName());
     }
   }
 
   private void copyDbs() throws MetaException, NoSuchObjectException, InvalidObjectException {
     screen("Copying databases");
-    for (String dbName : rdbmsStore.getAllDatabases()) {
-      Database db = rdbmsStore.getDatabase(dbName);
+    List<String> toCopy = doAll ? rdbmsStore.get().getAllDatabases() : dbsToImport;
+    for (String dbName : toCopy) {
+      Database db = rdbmsStore.get().getDatabase(dbName);
       dbs.add(db);
       screen("Copying database " + dbName);
-      hbaseStore.createDatabase(db);
+      hbaseStore.get().createDatabase(db);
     }
   }
 
-  private void copyTables() throws MetaException, InvalidObjectException {
+  private void copyTables() throws MetaException, InvalidObjectException, InterruptedException {
     screen("Copying tables");
+
+    // Start the parallel threads that will copy the tables
+    Thread[] copiers = new Thread[parallel];
+    writingToQueue = true;
+    for (int i = 0; i < parallel; i++) {
+      copiers[i] = new TableCopier();
+      copiers[i].start();
+    }
+
+    // Put tables from the databases we copied into the queue
     for (Database db : dbs) {
       screen("Coyping tables in database " + db.getName());
-      for (String tableName : rdbmsStore.getAllTables(db.getName())) {
-        Table table = rdbmsStore.getTable(db.getName(), tableName);
-        tables.add(table);
-        screen("Copying table " + db.getName() + "." + tableName);
-        hbaseStore.createTable(table);
+      for (String tableName : rdbmsStore.get().getAllTables(db.getName())) {
+        tableNameQueue.put(new String[]{db.getName(), tableName});
+      }
+    }
+
+    // Now put any specifically requested tables into the queue
+    if (tablesToImport != null) {
+      for (String compoundTableName : tablesToImport) {
+        String[] tn = compoundTableName.split("\\.");
+        if (tn.length != 2) {
+          error(compoundTableName + " not in proper form.  Must be in form dbname.tablename.  " +
+              "Ignoring this table and continuing.");
+        } else {
+          tableNameQueue.put(new String[]{tn[0], tn[1]});
+        }
+      }
+    }
+    writingToQueue = false;
+
+    // Wait until we've finished adding all the tables
+    for (Thread copier : copiers) copier.join();
+ }
+
+  private class TableCopier extends Thread {
+    @Override
+    public void run() {
+      while (writingToQueue || tableNameQueue.size() > 0) {
+        try {
+          String[] name = tableNameQueue.poll(1, TimeUnit.SECONDS);
+          if (name != null) {
+            Table table = rdbmsStore.get().getTable(name[0], name[1]);
+            // If this has partitions, put it in the list to fetch partions for
+            if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) {
+              partitionedTables.put(table);
+            }
+            screen("Copying table " + name[0] + "." + name[1]);
+            hbaseStore.get().createTable(table);
+          }
+        } catch (InterruptedException | MetaException | InvalidObjectException e) {
+          throw new RuntimeException(e);
+        }
       }
     }
   }
 
+  /* Partition copying is a little complex.  As we went through and copied the tables we put each
+   * partitioned table into a queue.  We will now go through that queue and add partitions for the
+   * tables.  We do the finding of partitions and writing of them separately and in parallel.
+   * This way if there is one table with >> partitions then all of the others that skew won't
+   * hurt us.  To avoid pulling all of the partitions for a table into memory, we batch up
+   * partitions (by default in batches of 1000) and copy them over in batches.
+   */
   private void copyPartitions() throws MetaException, NoSuchObjectException,
-      InvalidObjectException {
+      InvalidObjectException, InterruptedException {
     screen("Copying partitions");
-    for (Table table : tables) {
-      System.out.print("Copying partitions for table " + table.getDbName() + "." +
-          table.getTableName());
-      for (Partition part : rdbmsStore.getPartitions(table.getDbName(), table.getTableName(), -1)) {
-        LOG.info("Copying " + table.getTableName() + "." + table.getTableName() + "." +
-            StringUtils.join(part.getValues(), ':'));
-        System.out.print('.');
-        hbaseStore.addPartition(part);
+    readersFinished = false;
+    Thread[] readers = new Thread[parallel];
+    Thread[] writers = new Thread[parallel];
+    for (int i = 0; i < parallel; i++) {
+      readers[i] = new PartitionReader();
+      readers[i].start();
+      writers[i] = new PartitionWriter();
+      writers[i].start();
+    }
+
+    for (Thread reader : readers) reader.join();
+    readersFinished = true;
+
+    // Wait until we've finished adding all the partitions
+    for (Thread writer : writers) writer.join();
+  }
+
+  private class PartitionReader extends Thread {
+    @Override
+    public void run() {
+      while (partitionedTables.size() > 0) {
+        try {
+          Table table = partitionedTables.poll(1, TimeUnit.SECONDS);
+          if (table != null) {
+            screen("Fetching partitions for table " + table.getDbName() + "." +
+                table.getTableName());
+            List<String> partNames =
+                rdbmsStore.get().listPartitionNames(table.getDbName(), table.getTableName(),
+                    (short) -1);
+            if (partNames.size() <= batchSize) {
+              LOG.debug("Adding all partition names to queue for " + table.getDbName() + "." +
+                  table.getTableName());
+              partQueue.put(new PartQueueEntry(table.getDbName(), table.getTableName(), partNames));
+            } else {
+              int goUntil = partNames.size() % batchSize == 0 ? partNames.size() / batchSize :
+                  partNames.size() / batchSize + 1;
+              for (int i = 0; i < goUntil; i++) {
+                int start = i * batchSize;
+                int end = Math.min((i + 1) * batchSize, partNames.size());
+                LOG.debug("Adding partitions " + start + " to " + end + " for " + table.getDbName()
+                    + "." + table.getTableName());
+                partQueue.put(new PartQueueEntry(table.getDbName(), table.getTableName(),
+                    partNames.subList(start, end)));
+              }
+            }
+          }
+        } catch (InterruptedException | MetaException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  private class PartitionWriter extends Thread {
+    @Override
+    public void run() {
+      // This keeps us from throwing exceptions in our raw store calls
+      Deadline.registerIfNot(1000000);
+      while (!readersFinished || partQueue.size() > 0) {
+        try {
+          PartQueueEntry entry = partQueue.poll(1, TimeUnit.SECONDS);
+          if (entry != null) {
+            LOG.info("Writing partitions " + entry.dbName + "." + entry.tableName + "." +
+                StringUtils.join(entry.partNames, ':'));
+            // Fetch these partitions and write them to HBase
+            Deadline.startTimer("hbaseimport");
+            List<Partition> parts =
+                rdbmsStore.get().getPartitionsByNames(entry.dbName, entry.tableName,
+                    entry.partNames);
+            hbaseStore.get().addPartitions(entry.dbName, entry.tableName, parts);
+            Deadline.stopTimer();
+          }
+        } catch (InterruptedException | MetaException | InvalidObjectException |
+            NoSuchObjectException e) {
+          throw new RuntimeException(e);
+        }
       }
-      System.out.println();
     }
   }
 
   private void copyFunctions() throws MetaException, NoSuchObjectException, InvalidObjectException {
     screen("Copying functions");
+    // Copy any functions from databases we copied.
     for (Database db : dbs) {
       screen("Copying functions in database " + db.getName());
-      for (String funcName : rdbmsStore.getFunctions(db.getName(), "*")) {
-        Function func = rdbmsStore.getFunction(db.getName(), funcName);
-        screen("Copying function " + db.getName() + "." + funcName);
-        hbaseStore.createFunction(func);
+      for (String funcName : rdbmsStore.get().getFunctions(db.getName(), "*")) {
+        copyOneFunction(db.getName(), funcName);
       }
     }
+    // Now do any specifically requested functions
+    if (functionsToImport != null) {
+      for (String compoundFuncName : functionsToImport) {
+        String[] fn = compoundFuncName.split("\\.");
+        if (fn.length != 2) {
+          error(compoundFuncName + " not in proper form.  Must be in form dbname.funcname.  " +
+              "Ignoring this function and continuing.");
+        } else {
+          copyOneFunction(fn[0], fn[1]);
+        }
+      }
+    }
+  }
+
+  private void copyOneFunction(String dbName, String funcName) throws MetaException,
+      InvalidObjectException {
+    Function func = rdbmsStore.get().getFunction(dbName, funcName);
+    screen("Copying function " + dbName + "." + funcName);
+    hbaseStore.get().createFunction(func);
+  }
+
+  private void copyKerberos() throws MetaException {
+    screen("Copying kerberos related items");
+    for (String tokenId : rdbmsStore.get().getAllTokenIdentifiers()) {
+      String token = rdbmsStore.get().getToken(tokenId);
+      hbaseStore.get().addToken(tokenId, token);
+    }
+    for (String masterKey : rdbmsStore.get().getMasterKeys()) {
+      hbaseStore.get().addMasterKey(masterKey);
+    }
   }
 
   private void screen(String msg) {
@@ -173,12 +485,29 @@ public class HBaseImport {
     System.out.println(msg);
   }
 
+  private void error(String msg) {
+    LOG.error(msg);
+    System.err.println("ERROR:  " + msg);
+  }
+
   @VisibleForTesting
-  HBaseImport setConnections(RawStore rdbms, RawStore hbase) {
-    rdbmsStore = rdbms;
-    hbaseStore = hbase;
+  void setConnections(RawStore rdbms, RawStore hbase) {
+    rdbmsStore.set(rdbms);
+    hbaseStore.set(hbase);
+    rdbmsConf = rdbms.getConf();
+    hbaseConf = hbase.getConf();
+  }
 
-    return new HBaseImport();
+  private static class PartQueueEntry {
+    final String dbName;
+    final String tableName;
+    final List<String> partNames;
+
+    PartQueueEntry(String d, String t, List<String> p) {
+      dbName = d;
+      tableName = t;
+      partNames = p;
+    }
   }
 
 }