You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/22 23:31:51 UTC
[10/52] [abbrv] hive git commit: HIVE-11389 hbase import should allow
partial imports and should work in parallel (gates)
HIVE-11389 hbase import should allow partial imports and should work in parallel (gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0fa45e4a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0fa45e4a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0fa45e4a
Branch: refs/heads/llap
Commit: 0fa45e4a562fc2586b1ef06a88e9c186a0835316
Parents: 7e7f461
Author: Alan Gates <ga...@hortonworks.com>
Authored: Fri Jul 31 11:07:00 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Fri Jul 31 11:07:00 2015 -0700
----------------------------------------------------------------------
.../hive/metastore/hbase/TestHBaseImport.java | 557 +++++++++++++++++--
.../hive/metastore/hbase/HBaseImport.java | 435 +++++++++++++--
2 files changed, 899 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0fa45e4a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java
index 7bdff18..1ac10f0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseImport.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.ResourceType;
@@ -38,12 +41,16 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* Test that import from an RDBMS based metastore works
@@ -52,6 +59,13 @@ public class TestHBaseImport extends HBaseIntegrationTests {
private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName());
+ private static final String[] tableNames = new String[] {"allnonparttable", "allparttable"};
+ private static final String[] partVals = new String[] {"na", "emea", "latam", "apac"};
+ private static final String[] funcNames = new String[] {"allfunc1", "allfunc2"};
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
@BeforeClass
public static void startup() throws Exception {
HBaseIntegrationTests.startMiniCluster();
@@ -69,25 +83,396 @@ public class TestHBaseImport extends HBaseIntegrationTests {
}
@Test
- public void doImport() throws Exception {
- RawStore rdbms = new ObjectStore();
+ public void importAll() throws Exception {
+ RawStore rdbms;
+ rdbms = new ObjectStore();
+ rdbms.setConf(conf);
+
+ String[] dbNames = new String[] {"alldb1", "alldb2"};
+ String[] roles = new String[] {"allrole1", "allrole2"};
+ String[] tokenIds = new String[] {"alltokenid1", "alltokenid2"};
+ String[] tokens = new String[] {"alltoken1", "alltoken2"};
+ String[] masterKeys = new String[] {"allmk1", "allmk2"};
+ int now = (int)System.currentTimeMillis() / 1000;
+
+ setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+ int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+ int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+
+ HBaseImport importer = new HBaseImport("-a");
+ importer.setConnections(rdbms, store);
+ importer.run();
+
+ for (int i = 0; i < roles.length; i++) {
+ Role role = store.getRole(roles[i]);
+ Assert.assertNotNull(role);
+ Assert.assertEquals(roles[i], role.getRoleName());
+ }
+ // Make sure there aren't any extra roles
+ Assert.assertEquals(baseNumRoles + 2, store.listRoleNames().size());
+
+ for (int i = 0; i < dbNames.length; i++) {
+ Database db = store.getDatabase(dbNames[i]);
+ Assert.assertNotNull(db);
+ // check one random value in the db rather than every value
+ Assert.assertEquals("file:/tmp", db.getLocationUri());
+
+ Table table = store.getTable(db.getName(), tableNames[0]);
+ Assert.assertNotNull(table);
+ Assert.assertEquals(now, table.getLastAccessTime());
+ Assert.assertEquals("input", table.getSd().getInputFormat());
+
+ table = store.getTable(db.getName(), tableNames[1]);
+ Assert.assertNotNull(table);
+
+ for (int j = 0; j < partVals.length; j++) {
+ Partition part = store.getPartition(dbNames[i], tableNames[1], Arrays.asList(partVals[j]));
+ Assert.assertNotNull(part);
+ Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation());
+ }
+
+ Assert.assertEquals(4, store.getPartitions(dbNames[i], tableNames[1], -1).size());
+ Assert.assertEquals(2, store.getAllTables(dbNames[i]).size());
+
+ Assert.assertEquals(2, store.getFunctions(dbNames[i], "*").size());
+ for (int j = 0; j < funcNames.length; j++) {
+ Assert.assertNotNull(store.getFunction(dbNames[i], funcNames[j]));
+ }
+ }
+
+ Assert.assertEquals(baseNumDbs + 2, store.getAllDatabases().size());
+
+ // I can't test total number of tokens or master keys because the import grabs all and copies
+ // them, which means it grabs the ones imported by importSecurity test (if it's already run).
+ // Depending on it already running would make the tests order dependent, which junit doesn't
+ // guarantee.
+ for (int i = 0; i < tokenIds.length; i++) {
+ Assert.assertEquals(tokens[i], store.getToken(tokenIds[i]));
+ }
+ String[] hbaseKeys = store.getMasterKeys();
+ Set<String> keys = new HashSet<>(Arrays.asList(hbaseKeys));
+ for (int i = 0; i < masterKeys.length; i++) {
+ Assert.assertTrue(keys.contains(masterKeys[i]));
+ }
+ }
+
+ @Test
+ public void importOneDb() throws Exception {
+ RawStore rdbms;
+ rdbms = new ObjectStore();
+ rdbms.setConf(conf);
+
+ String[] dbNames = new String[] {"onedbdb1", "onedbdb2"};
+ String[] roles = new String[] {"onedbrole1", "onedbrole2"};
+ String[] tokenIds = new String[] {"onedbtokenid1", "onedbtokenid2"};
+ String[] tokens = new String[] {"onedbtoken1", "onedbtoken2"};
+ String[] masterKeys = new String[] {"onedbmk1", "onedbmk2"};
+ int now = (int)System.currentTimeMillis() / 1000;
+
+ setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+ int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+ int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+ int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+ store.getAllTokenIdentifiers().size();
+ int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+ HBaseImport importer = new HBaseImport("-d", dbNames[0]);
+ importer.setConnections(rdbms, store);
+ importer.run();
+
+ // Make sure there aren't any extra roles
+ Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+ Database db = store.getDatabase(dbNames[0]);
+ Assert.assertNotNull(db);
+ // check one random value in the db rather than every value
+ Assert.assertEquals("file:/tmp", db.getLocationUri());
+
+ Table table = store.getTable(db.getName(), tableNames[0]);
+ Assert.assertNotNull(table);
+ Assert.assertEquals(now, table.getLastAccessTime());
+ Assert.assertEquals("input", table.getSd().getInputFormat());
+
+ table = store.getTable(db.getName(), tableNames[1]);
+ Assert.assertNotNull(table);
+
+ for (int j = 0; j < partVals.length; j++) {
+ Partition part = store.getPartition(dbNames[0], tableNames[1], Arrays.asList(partVals[j]));
+ Assert.assertNotNull(part);
+ Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation());
+ }
+
+ Assert.assertEquals(4, store.getPartitions(dbNames[0], tableNames[1], -1).size());
+ Assert.assertEquals(2, store.getAllTables(dbNames[0]).size());
+
+ Assert.assertEquals(2, store.getFunctions(dbNames[0], "*").size());
+ for (int j = 0; j < funcNames.length; j++) {
+ Assert.assertNotNull(store.getFunction(dbNames[0], funcNames[j]));
+ }
+
+ Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size());
+
+ Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+ String[] hbaseKeys = store.getMasterKeys();
+ Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+
+ // Have to do this last as it will throw an exception
+ thrown.expect(NoSuchObjectException.class);
+ store.getDatabase(dbNames[1]);
+ }
+
+ @Test
+ public void importOneFunc() throws Exception {
+ RawStore rdbms;
+ rdbms = new ObjectStore();
+ rdbms.setConf(conf);
+
+ String[] dbNames = new String[] {"onefuncdb1", "onefuncdb2"};
+ String[] roles = new String[] {"onefuncrole1", "onefuncrole2"};
+ String[] tokenIds = new String[] {"onefunctokenid1", "onefunctokenid2"};
+ String[] tokens = new String[] {"onefunctoken1", "onefunctoken2"};
+ String[] masterKeys = new String[] {"onefuncmk1", "onefuncmk2"};
+ int now = (int)System.currentTimeMillis() / 1000;
+
+ setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+ int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+ int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+ int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+ store.getAllTokenIdentifiers().size();
+ int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+ // Create the database so I can put the function in it.
+ store.createDatabase(
+ new Database(dbNames[0], "no description", "file:/tmp", emptyParameters));
+
+ HBaseImport importer = new HBaseImport("-f", dbNames[0] + "." + funcNames[0]);
+ importer.setConnections(rdbms, store);
+ importer.run();
+
+ // Make sure there aren't any extra roles
+ Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+ Database db = store.getDatabase(dbNames[0]);
+ Assert.assertNotNull(db);
+
+ Assert.assertEquals(0, store.getAllTables(dbNames[0]).size());
+ Assert.assertEquals(1, store.getFunctions(dbNames[0], "*").size());
+ Assert.assertNotNull(store.getFunction(dbNames[0], funcNames[0]));
+ Assert.assertNull(store.getFunction(dbNames[0], funcNames[1]));
+
+ Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size());
+
+ Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+ String[] hbaseKeys = store.getMasterKeys();
+ Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+ }
+
+ @Test
+ public void importOneTableNonPartitioned() throws Exception {
+ RawStore rdbms;
+ rdbms = new ObjectStore();
+ rdbms.setConf(conf);
+
+ String[] dbNames = new String[] {"onetabdb1", "onetabdb2"};
+ String[] roles = new String[] {"onetabrole1", "onetabrole2"};
+ String[] tokenIds = new String[] {"onetabtokenid1", "onetabtokenid2"};
+ String[] tokens = new String[] {"onetabtoken1", "onetabtoken2"};
+ String[] masterKeys = new String[] {"onetabmk1", "onetabmk2"};
+ int now = (int)System.currentTimeMillis() / 1000;
+
+ setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+ int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+ int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+ int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+ store.getAllTokenIdentifiers().size();
+ int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+ // Create the database so I can put the table in it.
+ store.createDatabase(
+ new Database(dbNames[0], "no description", "file:/tmp", emptyParameters));
+
+ HBaseImport importer = new HBaseImport("-t", dbNames[0] + "." + tableNames[0]);
+ importer.setConnections(rdbms, store);
+ importer.run();
+
+ // Make sure there aren't any extra roles
+ Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+ Database db = store.getDatabase(dbNames[0]);
+ Assert.assertNotNull(db);
+
+ Table table = store.getTable(db.getName(), tableNames[0]);
+ Assert.assertNotNull(table);
+ Assert.assertEquals(1, store.getAllTables(db.getName()).size());
+ Assert.assertNull(store.getTable(db.getName(), tableNames[1]));
+
+ Assert.assertEquals(0, store.getFunctions(dbNames[0], "*").size());
+ Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size());
+
+ Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+ String[] hbaseKeys = store.getMasterKeys();
+ Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+
+ }
+
+ @Test
+ public void importOneTablePartitioned() throws Exception {
+ RawStore rdbms;
+ rdbms = new ObjectStore();
+ rdbms.setConf(conf);
+
+ String[] dbNames = new String[] {"onetabpartdb1", "onetabpartodb2"};
+ String[] roles = new String[] {"onetabpartorole1", "onetabpartorole2"};
+ String[] tokenIds = new String[] {"onetabpartotokenid1", "onetabpartotokenid2"};
+ String[] tokens = new String[] {"onetabpartotoken1", "onetabpartotoken2"};
+ String[] masterKeys = new String[] {"onetabpartomk1", "onetabpartomk2"};
+ int now = (int)System.currentTimeMillis() / 1000;
+
+ setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+ int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+ int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+ int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+ store.getAllTokenIdentifiers().size();
+ int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+ // Create the database so I can put the table in it.
+ store.createDatabase(
+ new Database(dbNames[0], "no description", "file:/tmp", emptyParameters));
+
+ HBaseImport importer = new HBaseImport("-t", dbNames[0] + "." + tableNames[1]);
+ importer.setConnections(rdbms, store);
+ importer.run();
+
+ // Make sure there aren't any extra roles
+ Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+ Database db = store.getDatabase(dbNames[0]);
+ Assert.assertNotNull(db);
+
+ Table table = store.getTable(db.getName(), tableNames[1]);
+ Assert.assertNotNull(table);
+ Assert.assertEquals(1, store.getAllTables(db.getName()).size());
+
+ for (int j = 0; j < partVals.length; j++) {
+ Partition part = store.getPartition(dbNames[0], tableNames[1], Arrays.asList(partVals[j]));
+ Assert.assertNotNull(part);
+ Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation());
+ }
+ Assert.assertEquals(4, store.getPartitions(dbNames[0], tableNames[1], -1).size());
+
+ Assert.assertNull(store.getTable(db.getName(), tableNames[0]));
+
+ Assert.assertEquals(0, store.getFunctions(dbNames[0], "*").size());
+ Assert.assertEquals(baseNumDbs + 1, store.getAllDatabases().size());
+
+ Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+ String[] hbaseKeys = store.getMasterKeys();
+ Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+ }
+
+ @Test
+ public void importSecurity() throws Exception {
+ RawStore rdbms;
+ rdbms = new ObjectStore();
+ rdbms.setConf(conf);
+
+ String[] dbNames = new String[] {"securitydb1", "securitydb2"};
+ String[] roles = new String[] {"securityrole1", "securityrole2"};
+ String[] tokenIds = new String[] {"securitytokenid1", "securitytokenid2"};
+ String[] tokens = new String[] {"securitytoken1", "securitytoken2"};
+ String[] masterKeys = new String[] {"securitymk1", "securitymk2"};
+ int now = (int)System.currentTimeMillis() / 1000;
+
+ setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+ int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+ int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+
+ HBaseImport importer = new HBaseImport("-k");
+ importer.setConnections(rdbms, store);
+ importer.run();
+
+ Assert.assertEquals(baseNumRoles, store.listRoleNames().size());
+
+ Assert.assertEquals(baseNumDbs, store.getAllDatabases().size());
+
+ // I can't test total number of tokens or master keys because the import grabs all and copies
+ // them, which means it grabs the ones imported by importAll test (if it's already run).
+ // Depending on it already running would make the tests order dependent, which junit doesn't
+ // guarantee.
+ for (int i = 0; i < tokenIds.length; i++) {
+ Assert.assertEquals(tokens[i], store.getToken(tokenIds[i]));
+ }
+ String[] hbaseKeys = store.getMasterKeys();
+ Set<String> keys = new HashSet<>(Arrays.asList(hbaseKeys));
+ for (int i = 0; i < masterKeys.length; i++) {
+ Assert.assertTrue(keys.contains(masterKeys[i]));
+ }
+ }
+
+ // TODO test for bogus function name
+ // TODO test for bogus table name
+ // TODO test for non-existent items
+
+ @Test
+ public void importOneRole() throws Exception {
+ RawStore rdbms;
+ rdbms = new ObjectStore();
rdbms.setConf(conf);
- String[] dbNames = new String[] {"importdb1", "importdb2"};
- String[] tableNames = new String[] {"nonparttable", "parttable"};
- String[] partVals = new String[] {"na", "emea", "latam", "apac"};
- String[] funcNames = new String[] {"func1", "func2"};
- String[] roles = new String[] {"role1", "role2"};
+ String[] dbNames = new String[] {"oneroledb1", "oneroledb2"};
+ String[] roles = new String[] {"onerolerole1", "onerolerole2"};
+ String[] tokenIds = new String[] {"oneroletokenid1", "oneroletokenid2"};
+ String[] tokens = new String[] {"oneroletoken1", "oneroletoken2"};
+ String[] masterKeys = new String[] {"onerolemk1", "onerolemk2"};
int now = (int)System.currentTimeMillis() / 1000;
+ setupObjectStore(rdbms, roles, dbNames, tokenIds, tokens, masterKeys, now);
+
+ int baseNumRoles = store.listRoleNames() == null ? 0 : store.listRoleNames().size();
+ int baseNumDbs = store.getAllDatabases() == null ? 0 : store.getAllDatabases().size();
+ int baseNumToks = store.getAllTokenIdentifiers() == null ? 0 :
+ store.getAllTokenIdentifiers().size();
+ int baseNumKeys = store.getMasterKeys() == null ? 0 : store.getMasterKeys().length;
+
+ HBaseImport importer = new HBaseImport("-r", roles[0]);
+ importer.setConnections(rdbms, store);
+ importer.run();
+
+ Role role = store.getRole(roles[0]);
+ Assert.assertNotNull(role);
+ Assert.assertEquals(roles[0], role.getRoleName());
+
+ // Make sure there aren't any extra roles
+ Assert.assertEquals(baseNumRoles + 1, store.listRoleNames().size());
+ Assert.assertEquals(baseNumDbs, store.getAllDatabases().size());
+
+ Assert.assertEquals(baseNumToks, store.getAllTokenIdentifiers().size());
+ String[] hbaseKeys = store.getMasterKeys();
+ Assert.assertEquals(baseNumKeys, hbaseKeys.length);
+
+ // Have to do this last as it will throw an exception
+ thrown.expect(NoSuchObjectException.class);
+ store.getRole(roles[1]);
+ }
+
+ private void setupObjectStore(RawStore rdbms, String[] roles, String[] dbNames,
+ String[] tokenIds, String[] tokens, String[] masterKeys, int now)
+ throws MetaException, InvalidObjectException, NoSuchObjectException {
for (int i = 0; i < roles.length; i++) {
rdbms.addRole(roles[i], "me");
}
for (int i = 0; i < dbNames.length; i++) {
- rdbms.createDatabase(new Database(dbNames[i], "no description", "file:/tmp", emptyParameters));
+ rdbms.createDatabase(
+ new Database(dbNames[i], "no description", "file:/tmp", emptyParameters));
- List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ List<FieldSchema> cols = new ArrayList<>();
cols.add(new FieldSchema("col1", "int", "nocomment"));
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
@@ -95,7 +480,7 @@ public class TestHBaseImport extends HBaseIntegrationTests {
rdbms.createTable(new Table(tableNames[0], dbNames[i], "me", now, now, 0, sd, null,
emptyParameters, null, null, null));
- List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+ List<FieldSchema> partCols = new ArrayList<>();
partCols.add(new FieldSchema("region", "string", ""));
rdbms.createTable(new Table(tableNames[1], dbNames[i], "me", now, now, 0, sd, partCols,
emptyParameters, null, null, null));
@@ -105,57 +490,149 @@ public class TestHBaseImport extends HBaseIntegrationTests {
psd.setLocation("file:/tmp/region=" + partVals[j]);
Partition part = new Partition(Arrays.asList(partVals[j]), dbNames[i], tableNames[1],
now, now, psd, emptyParameters);
- store.addPartition(part);
+ rdbms.addPartition(part);
}
for (String funcName : funcNames) {
- store.createFunction(new Function(funcName, dbNames[i], "classname", "ownername",
- PrincipalType.USER, (int)System.currentTimeMillis()/1000, FunctionType.JAVA,
+ LOG.debug("Creating new function " + dbNames[i] + "." + funcName);
+ rdbms.createFunction(new Function(funcName, dbNames[i], "classname", "ownername",
+ PrincipalType.USER, (int) System.currentTimeMillis() / 1000, FunctionType.JAVA,
Arrays.asList(new ResourceUri(ResourceType.JAR, "uri"))));
}
}
+ for (int i = 0; i < tokenIds.length; i++) rdbms.addToken(tokenIds[i], tokens[i]);
+ for (int i = 0; i < masterKeys.length; i++) {
+ rdbms.addMasterKey(masterKeys[i]);
+ }
+ }
- HBaseImport importer = new HBaseImport();
- importer.setConnections(rdbms, store);
- importer.run();
+ @Test
+ public void parallel() throws Exception {
+ int parallelFactor = 10;
+ RawStore rdbms;
+ rdbms = new ObjectStore();
+ rdbms.setConf(conf);
- for (int i = 0; i < roles.length; i++) {
- Role role = store.getRole(roles[i]);
- Assert.assertNotNull(role);
- Assert.assertEquals(roles[i], role.getRoleName());
+ String[] dbNames = new String[] {"paralleldb1"};
+ int now = (int)System.currentTimeMillis() / 1000;
+
+ for (int i = 0; i < dbNames.length; i++) {
+ rdbms.createDatabase(
+ new Database(dbNames[i], "no description", "file:/tmp", emptyParameters));
+
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "int", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, emptyParameters);
+
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("region", "string", ""));
+ for (int j = 0; j < parallelFactor; j++) {
+ rdbms.createTable(new Table("t" + j, dbNames[i], "me", now, now, 0, sd, partCols,
+ emptyParameters, null, null, null));
+ for (int k = 0; k < parallelFactor; k++) {
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/region=" + k);
+ Partition part = new Partition(Arrays.asList("p" + k), dbNames[i], "t" + j,
+ now, now, psd, emptyParameters);
+ rdbms.addPartition(part);
+ }
+ }
}
- // Make sure there aren't any extra roles
- Assert.assertEquals(2, store.listRoleNames().size());
+
+ HBaseImport importer = new HBaseImport("-p", "2", "-b", "2", "-d", dbNames[0]);
+ importer.setConnections(rdbms, store);
+ importer.run();
for (int i = 0; i < dbNames.length; i++) {
Database db = store.getDatabase(dbNames[i]);
Assert.assertNotNull(db);
- // check one random value in the db rather than every value
- Assert.assertEquals("file:/tmp", db.getLocationUri());
- Table table = store.getTable(db.getName(), tableNames[0]);
- Assert.assertNotNull(table);
- Assert.assertEquals(now, table.getLastAccessTime());
- Assert.assertEquals("input", table.getSd().getInputFormat());
+ for (int j = 0; j < parallelFactor; j++) {
+ Table table = store.getTable(db.getName(), "t" + j);
+ Assert.assertNotNull(table);
+ Assert.assertEquals(now, table.getLastAccessTime());
+ Assert.assertEquals("input", table.getSd().getInputFormat());
- table = store.getTable(db.getName(), tableNames[1]);
- Assert.assertNotNull(table);
+ for (int k = 0; k < parallelFactor; k++) {
+ Partition part =
+ store.getPartition(dbNames[i], "t" + j, Arrays.asList("p" + k));
+ Assert.assertNotNull(part);
+ Assert.assertEquals("file:/tmp/region=" + k, part.getSd().getLocation());
+ }
- for (int j = 0; j < partVals.length; j++) {
- Partition part = store.getPartition(dbNames[i], tableNames[1], Arrays.asList(partVals[j]));
- Assert.assertNotNull(part);
- Assert.assertEquals("file:/tmp/region=" + partVals[j], part.getSd().getLocation());
+ Assert.assertEquals(parallelFactor, store.getPartitions(dbNames[i], "t" + j, -1).size());
}
+ Assert.assertEquals(parallelFactor, store.getAllTables(dbNames[i]).size());
- Assert.assertEquals(4, store.getPartitions(dbNames[i], tableNames[1], -1).size());
- Assert.assertEquals(2, store.getAllTables(dbNames[i]).size());
+ }
+ }
- Assert.assertEquals(2, store.getFunctions(dbNames[i], "*").size());
- for (int j = 0; j < funcNames.length; j++) {
- Assert.assertNotNull(store.getFunction(dbNames[i], funcNames[j]));
+ // Same as the test above except we create 9 of everything instead of 10. This is important
+ // because in using a batch size of 2 the previous test guarantees 10 /2 =5 , meaning we'll
+ // have 5 writes on the partition queue with exactly 2 entries. In this test we'll handle the
+ // case where the last entry in the queue has fewer partitions.
+ @Test
+ public void parallelOdd() throws Exception {
+ int parallelFactor = 9;
+ RawStore rdbms;
+ rdbms = new ObjectStore();
+ rdbms.setConf(conf);
+
+ String[] dbNames = new String[] {"oddparalleldb1"};
+ int now = (int)System.currentTimeMillis() / 1000;
+
+ for (int i = 0; i < dbNames.length; i++) {
+ rdbms.createDatabase(
+ new Database(dbNames[i], "no description", "file:/tmp", emptyParameters));
+
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema("col1", "int", "nocomment"));
+ SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
+ StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
+ serde, null, null, emptyParameters);
+
+ List<FieldSchema> partCols = new ArrayList<>();
+ partCols.add(new FieldSchema("region", "string", ""));
+ for (int j = 0; j < parallelFactor; j++) {
+ rdbms.createTable(new Table("t" + j, dbNames[i], "me", now, now, 0, sd, partCols,
+ emptyParameters, null, null, null));
+ for (int k = 0; k < parallelFactor; k++) {
+ StorageDescriptor psd = new StorageDescriptor(sd);
+ psd.setLocation("file:/tmp/region=" + k);
+ Partition part = new Partition(Arrays.asList("p" + k), dbNames[i], "t" + j,
+ now, now, psd, emptyParameters);
+ rdbms.addPartition(part);
+ }
}
}
- Assert.assertEquals(2, store.getAllDatabases().size());
+ HBaseImport importer = new HBaseImport("-p", "2", "-b", "2", "-d", dbNames[0]);
+ importer.setConnections(rdbms, store);
+ importer.run();
+
+ for (int i = 0; i < dbNames.length; i++) {
+ Database db = store.getDatabase(dbNames[i]);
+ Assert.assertNotNull(db);
+
+ for (int j = 0; j < parallelFactor; j++) {
+ Table table = store.getTable(db.getName(), "t" + j);
+ Assert.assertNotNull(table);
+ Assert.assertEquals(now, table.getLastAccessTime());
+ Assert.assertEquals("input", table.getSd().getInputFormat());
+
+ for (int k = 0; k < parallelFactor; k++) {
+ Partition part =
+ store.getPartition(dbNames[i], "t" + j, Arrays.asList("p" + k));
+ Assert.assertNotNull(part);
+ Assert.assertEquals("file:/tmp/region=" + k, part.getSd().getLocation());
+ }
+
+ Assert.assertEquals(parallelFactor, store.getPartitions(dbNames[i], "t" + j, -1).size());
+ }
+ Assert.assertEquals(parallelFactor, store.getAllTables(dbNames[i]).size());
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fa45e4a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
index e416b8a..e143de7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseImport.java
@@ -19,11 +19,18 @@
package org.apache.hadoop.hive.metastore.hbase;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Deadline;
import org.apache.hadoop.hive.metastore.ObjectStore;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -36,23 +43,41 @@ import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.Table;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
/**
* A tool to take the contents of an RDBMS based Hive metastore and import it into an HBase based
* one. To use this the config files for Hive configured to work with the RDBMS (that is,
- * including the JDBC string, etc.) and for HBase must be in the path. This tool will then
+ * including the JDBC string, etc.) as well as HBase configuration files must be in the path.
+ * There should not be a hive-site.xml that specifies HBaseStore in the path. This tool will then
* handle connecting to the RDBMS via the {@link org.apache.hadoop.hive.metastore.ObjectStore}
* and HBase via {@link org.apache.hadoop.hive.metastore.hbase.HBaseStore} and transferring the
* data.
+ *
+ * This tool can import an entire metastore or only selected objects. When selecting objects it
+ * is necessary to fully specify the object's name. For example, if you want to import the table
+ * T in the default database it needs to be identified as default.T. The same is true for
+ * functions. When an object is specified, everything under that object will be imported (e.g.
+ * if you select database D, then all tables and functions in that database will be
+ * imported as well).
+ *
+ * At this point only tables and partitions are handled in parallel as it is assumed there are
+ * relatively few of everything else.
+ *
+ * Note that HBaseSchemaTool must have already been used to create the appropriate tables in HBase.
*/
public class HBaseImport {
static final private Log LOG = LogFactory.getLog(HBaseImport.class.getName());
public static void main(String[] args) {
- HBaseImport tool = new HBaseImport();
try {
+ HBaseImport tool = new HBaseImport(args);
tool.run();
} catch (Exception e) {
System.err.println("Caught exception " + e.getClass().getName() + " with message <" +
@@ -60,112 +85,399 @@ public class HBaseImport {
}
}
+ private ThreadLocal<RawStore> rdbmsStore = new ThreadLocal<RawStore>() {
+ @Override
+ protected RawStore initialValue() {
+ if (rdbmsConf == null) {
+ throw new RuntimeException("order violation, need to set rdbms conf first");
+ }
+ RawStore os = new ObjectStore();
+ os.setConf(rdbmsConf);
+ return os;
+ }
+ };
+
+ private ThreadLocal<RawStore> hbaseStore = new ThreadLocal<RawStore>() {
+ @Override
+ protected RawStore initialValue() {
+ if (hbaseConf == null) {
+ throw new RuntimeException("order violation, need to set hbase conf first");
+ }
+ RawStore hs = new HBaseStore();
+ hs.setConf(hbaseConf);
+ return hs;
+ }
+ };
+
private Configuration rdbmsConf;
private Configuration hbaseConf;
- private RawStore rdbmsStore;
- private RawStore hbaseStore;
private List<Database> dbs;
- private List<Table> tables;
+ private BlockingQueue<Table> partitionedTables;
+ private BlockingQueue<String[]> tableNameQueue;
+ private BlockingQueue<PartQueueEntry> partQueue;
+ private boolean writingToQueue, readersFinished;
+ private boolean doKerberos, doAll;
+ private List<String> rolesToImport, dbsToImport, tablesToImport, functionsToImport;
+ private int parallel;
+ private int batchSize;
@VisibleForTesting
- HBaseImport() {
- dbs = new ArrayList<Database>();
- tables = new ArrayList<Table>();
+ HBaseImport(String... args) throws ParseException {
+ Options options = new Options();
+
+ doAll = doKerberos = false;
+ parallel = 1;
+ batchSize = 1000;
+
+ options.addOption(OptionBuilder
+ .withLongOpt("all")
+ .withDescription("Import the full metastore")
+ .create('a'));
+
+ options.addOption(OptionBuilder
+ .withLongOpt("batchsize")
+ .withDescription("Number of partitions to read and write in a batch, defaults to 1000")
+ .hasArg()
+ .create('b'));
+
+ options.addOption(OptionBuilder
+ .withLongOpt("database")
+ .withDescription("Import a single database")
+ .hasArgs()
+ .create('d'));
+
+ options.addOption(OptionBuilder
+ .withLongOpt("help")
+ .withDescription("You're looking at it")
+ .create('h'));
+
+ options.addOption(OptionBuilder
+ .withLongOpt("function")
+ .withDescription("Import a single function")
+ .hasArgs()
+ .create('f'));
+
+ options.addOption(OptionBuilder
+ .withLongOpt("kerberos")
+ .withDescription("Import all kerberos related objects (master key, tokens)")
+ .create('k'));
+
+ options.addOption(OptionBuilder
+ .withLongOpt("parallel")
+ .withDescription("Parallel factor for loading (only applied to tables and partitions), " +
+ "defaults to 1")
+ .hasArg()
+ .create('p'));
+
+ options.addOption(OptionBuilder
+ .withLongOpt("role")
+ .withDescription("Import a single role")
+ .hasArgs()
+ .create('r'));
+ options.addOption(OptionBuilder
+ .withLongOpt("tables")
+ .withDescription("Import a single tables")
+ .hasArgs()
+ .create('t'));
+
+ CommandLine cli = new GnuParser().parse(options, args);
+
+ // Process help, if it was asked for, this must be done first
+ if (cli.hasOption('h')) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("hbaseschematool", options);
+ // returning here results in nothing else happening, because none of the other flags have
+ // been set.
+ return;
+ }
+
+ // Now process the other command line args
+ if (cli.hasOption('a')) {
+ doAll = true;
+ }
+ if (cli.hasOption('b')) {
+ batchSize = Integer.valueOf(cli.getOptionValue('b'));
+ }
+ if (cli.hasOption('d')) {
+ dbsToImport = Arrays.asList(cli.getOptionValues('d'));
+ }
+ if (cli.hasOption('f')) {
+ functionsToImport = Arrays.asList(cli.getOptionValues('f'));
+ }
+ if (cli.hasOption('p')) {
+ parallel = Integer.valueOf(cli.getOptionValue('p'));
+ }
+ if (cli.hasOption('r')) {
+ rolesToImport = Arrays.asList(cli.getOptionValues('r'));
+ }
+ if (cli.hasOption('k')) {
+ doKerberos = true;
+ }
+ if (cli.hasOption('t')) {
+ tablesToImport = Arrays.asList(cli.getOptionValues('t'));
+ }
+
+ dbs = new ArrayList<>();
+ // We don't want to bound the size of the table queue because we keep it all in memory
+ partitionedTables = new LinkedBlockingQueue<>();
+ tableNameQueue = new LinkedBlockingQueue<>();
+
+ // Bound the size of this queue so we don't get too much in memory.
+ partQueue = new ArrayBlockingQueue<>(parallel * 2);
}
@VisibleForTesting
void run() throws MetaException, InstantiationException, IllegalAccessException,
- NoSuchObjectException, InvalidObjectException {
+ NoSuchObjectException, InvalidObjectException, InterruptedException {
// Order here is crucial, as you can't add tables until you've added databases, etc.
init();
- copyRoles();
- copyDbs();
- copyTables();
- copyPartitions();
- copyFunctions();
+ if (doAll || rolesToImport != null) {
+ copyRoles();
+ }
+ if (doAll || dbsToImport != null) {
+ copyDbs();
+ }
+ if (doAll || dbsToImport != null || tablesToImport != null) {
+ copyTables();
+ copyPartitions();
+ }
+ if (doAll || dbsToImport != null || functionsToImport != null) {
+ copyFunctions();
+ }
+ if (doAll || doKerberos) {
+ copyKerberos();
+ }
}
private void init() throws MetaException, IllegalAccessException, InstantiationException {
- if (rdbmsStore != null) {
+ if (rdbmsConf != null) {
// We've been configured for testing, so don't do anything here.
return;
}
- rdbmsConf = new HiveConf(); // We're depending on having everything properly in the path
- hbaseConf = new HiveConf();
+ // We're depending on having everything properly in the path
+ rdbmsConf = new HiveConf();
+ hbaseConf = new HiveConf();//
HiveConf.setVar(hbaseConf, HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
HBaseStore.class.getName());
HiveConf.setBoolVar(hbaseConf, HiveConf.ConfVars.METASTORE_FASTPATH, true);
// First get a connection to the RDBMS based store
- rdbmsStore = new ObjectStore();
- rdbmsStore.setConf(rdbmsConf);
+ rdbmsStore.get().setConf(rdbmsConf);
// Get a connection to the HBase based store
- hbaseStore = new HBaseStore();
- hbaseStore.setConf(hbaseConf);
- // This will go create the tables if they don't exist
- hbaseStore.verifySchema();
+ hbaseStore.get().setConf(hbaseConf);
}
private void copyRoles() throws NoSuchObjectException, InvalidObjectException, MetaException {
screen("Copying roles");
- for (String roleName : rdbmsStore.listRoleNames()) {
- Role role = rdbmsStore.getRole(roleName);
+ List<String> toCopy = doAll ? rdbmsStore.get().listRoleNames() : rolesToImport;
+ for (String roleName : toCopy) {
+ Role role = rdbmsStore.get().getRole(roleName);
screen("Copying role " + roleName);
- hbaseStore.addRole(roleName, role.getOwnerName());
+ hbaseStore.get().addRole(roleName, role.getOwnerName());
}
}
private void copyDbs() throws MetaException, NoSuchObjectException, InvalidObjectException {
screen("Copying databases");
- for (String dbName : rdbmsStore.getAllDatabases()) {
- Database db = rdbmsStore.getDatabase(dbName);
+ List<String> toCopy = doAll ? rdbmsStore.get().getAllDatabases() : dbsToImport;
+ for (String dbName : toCopy) {
+ Database db = rdbmsStore.get().getDatabase(dbName);
dbs.add(db);
screen("Copying database " + dbName);
- hbaseStore.createDatabase(db);
+ hbaseStore.get().createDatabase(db);
}
}
- private void copyTables() throws MetaException, InvalidObjectException {
+ private void copyTables() throws MetaException, InvalidObjectException, InterruptedException {
screen("Copying tables");
+
+ // Start the parallel threads that will copy the tables
+ Thread[] copiers = new Thread[parallel];
+ writingToQueue = true;
+ for (int i = 0; i < parallel; i++) {
+ copiers[i] = new TableCopier();
+ copiers[i].start();
+ }
+
+ // Put tables from the databases we copied into the queue
for (Database db : dbs) {
screen("Coyping tables in database " + db.getName());
- for (String tableName : rdbmsStore.getAllTables(db.getName())) {
- Table table = rdbmsStore.getTable(db.getName(), tableName);
- tables.add(table);
- screen("Copying table " + db.getName() + "." + tableName);
- hbaseStore.createTable(table);
+ for (String tableName : rdbmsStore.get().getAllTables(db.getName())) {
+ tableNameQueue.put(new String[]{db.getName(), tableName});
+ }
+ }
+
+ // Now put any specifically requested tables into the queue
+ if (tablesToImport != null) {
+ for (String compoundTableName : tablesToImport) {
+ String[] tn = compoundTableName.split("\\.");
+ if (tn.length != 2) {
+ error(compoundTableName + " not in proper form. Must be in form dbname.tablename. " +
+ "Ignoring this table and continuing.");
+ } else {
+ tableNameQueue.put(new String[]{tn[0], tn[1]});
+ }
+ }
+ }
+ writingToQueue = false;
+
+ // Wait until we've finished adding all the tables
+ for (Thread copier : copiers) copier.join();
+ }
+
+ private class TableCopier extends Thread {
+ @Override
+ public void run() {
+ while (writingToQueue || tableNameQueue.size() > 0) {
+ try {
+ String[] name = tableNameQueue.poll(1, TimeUnit.SECONDS);
+ if (name != null) {
+ Table table = rdbmsStore.get().getTable(name[0], name[1]);
+ // If this has partitions, put it in the list to fetch partions for
+ if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) {
+ partitionedTables.put(table);
+ }
+ screen("Copying table " + name[0] + "." + name[1]);
+ hbaseStore.get().createTable(table);
+ }
+ } catch (InterruptedException | MetaException | InvalidObjectException e) {
+ throw new RuntimeException(e);
+ }
}
}
}
+ /* Partition copying is a little complex. As we went through and copied the tables we put each
+ * partitioned table into a queue. We will now go through that queue and add partitions for the
+ * tables. We do the finding of partitions and writing of them separately and in parallel.
+ * This way if there is one table with >> partitions then all of the others that skew won't
+ * hurt us. To avoid pulling all of the partitions for a table into memory, we batch up
+ * partitions (by default in batches of 1000) and copy them over in batches.
+ */
private void copyPartitions() throws MetaException, NoSuchObjectException,
- InvalidObjectException {
+ InvalidObjectException, InterruptedException {
screen("Copying partitions");
- for (Table table : tables) {
- System.out.print("Copying partitions for table " + table.getDbName() + "." +
- table.getTableName());
- for (Partition part : rdbmsStore.getPartitions(table.getDbName(), table.getTableName(), -1)) {
- LOG.info("Copying " + table.getTableName() + "." + table.getTableName() + "." +
- StringUtils.join(part.getValues(), ':'));
- System.out.print('.');
- hbaseStore.addPartition(part);
+ readersFinished = false;
+ Thread[] readers = new Thread[parallel];
+ Thread[] writers = new Thread[parallel];
+ for (int i = 0; i < parallel; i++) {
+ readers[i] = new PartitionReader();
+ readers[i].start();
+ writers[i] = new PartitionWriter();
+ writers[i].start();
+ }
+
+ for (Thread reader : readers) reader.join();
+ readersFinished = true;
+
+ // Wait until we've finished adding all the partitions
+ for (Thread writer : writers) writer.join();
+ }
+
+ private class PartitionReader extends Thread {
+ @Override
+ public void run() {
+ while (partitionedTables.size() > 0) {
+ try {
+ Table table = partitionedTables.poll(1, TimeUnit.SECONDS);
+ if (table != null) {
+ screen("Fetching partitions for table " + table.getDbName() + "." +
+ table.getTableName());
+ List<String> partNames =
+ rdbmsStore.get().listPartitionNames(table.getDbName(), table.getTableName(),
+ (short) -1);
+ if (partNames.size() <= batchSize) {
+ LOG.debug("Adding all partition names to queue for " + table.getDbName() + "." +
+ table.getTableName());
+ partQueue.put(new PartQueueEntry(table.getDbName(), table.getTableName(), partNames));
+ } else {
+ int goUntil = partNames.size() % batchSize == 0 ? partNames.size() / batchSize :
+ partNames.size() / batchSize + 1;
+ for (int i = 0; i < goUntil; i++) {
+ int start = i * batchSize;
+ int end = Math.min((i + 1) * batchSize, partNames.size());
+ LOG.debug("Adding partitions " + start + " to " + end + " for " + table.getDbName()
+ + "." + table.getTableName());
+ partQueue.put(new PartQueueEntry(table.getDbName(), table.getTableName(),
+ partNames.subList(start, end)));
+ }
+ }
+ }
+ } catch (InterruptedException | MetaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private class PartitionWriter extends Thread {
+ @Override
+ public void run() {
+ // This keeps us from throwing exceptions in our raw store calls
+ Deadline.registerIfNot(1000000);
+ while (!readersFinished || partQueue.size() > 0) {
+ try {
+ PartQueueEntry entry = partQueue.poll(1, TimeUnit.SECONDS);
+ if (entry != null) {
+ LOG.info("Writing partitions " + entry.dbName + "." + entry.tableName + "." +
+ StringUtils.join(entry.partNames, ':'));
+ // Fetch these partitions and write them to HBase
+ Deadline.startTimer("hbaseimport");
+ List<Partition> parts =
+ rdbmsStore.get().getPartitionsByNames(entry.dbName, entry.tableName,
+ entry.partNames);
+ hbaseStore.get().addPartitions(entry.dbName, entry.tableName, parts);
+ Deadline.stopTimer();
+ }
+ } catch (InterruptedException | MetaException | InvalidObjectException |
+ NoSuchObjectException e) {
+ throw new RuntimeException(e);
+ }
}
- System.out.println();
}
}
private void copyFunctions() throws MetaException, NoSuchObjectException, InvalidObjectException {
screen("Copying functions");
+ // Copy any functions from databases we copied.
for (Database db : dbs) {
screen("Copying functions in database " + db.getName());
- for (String funcName : rdbmsStore.getFunctions(db.getName(), "*")) {
- Function func = rdbmsStore.getFunction(db.getName(), funcName);
- screen("Copying function " + db.getName() + "." + funcName);
- hbaseStore.createFunction(func);
+ for (String funcName : rdbmsStore.get().getFunctions(db.getName(), "*")) {
+ copyOneFunction(db.getName(), funcName);
}
}
+ // Now do any specifically requested functions
+ if (functionsToImport != null) {
+ for (String compoundFuncName : functionsToImport) {
+ String[] fn = compoundFuncName.split("\\.");
+ if (fn.length != 2) {
+ error(compoundFuncName + " not in proper form. Must be in form dbname.funcname. " +
+ "Ignoring this function and continuing.");
+ } else {
+ copyOneFunction(fn[0], fn[1]);
+ }
+ }
+ }
+ }
+
+ private void copyOneFunction(String dbName, String funcName) throws MetaException,
+ InvalidObjectException {
+ Function func = rdbmsStore.get().getFunction(dbName, funcName);
+ screen("Copying function " + dbName + "." + funcName);
+ hbaseStore.get().createFunction(func);
+ }
+
+ private void copyKerberos() throws MetaException {
+ screen("Copying kerberos related items");
+ for (String tokenId : rdbmsStore.get().getAllTokenIdentifiers()) {
+ String token = rdbmsStore.get().getToken(tokenId);
+ hbaseStore.get().addToken(tokenId, token);
+ }
+ for (String masterKey : rdbmsStore.get().getMasterKeys()) {
+ hbaseStore.get().addMasterKey(masterKey);
+ }
}
private void screen(String msg) {
@@ -173,12 +485,29 @@ public class HBaseImport {
System.out.println(msg);
}
+ private void error(String msg) {
+ LOG.error(msg);
+ System.err.println("ERROR: " + msg);
+ }
+
@VisibleForTesting
- HBaseImport setConnections(RawStore rdbms, RawStore hbase) {
- rdbmsStore = rdbms;
- hbaseStore = hbase;
+ void setConnections(RawStore rdbms, RawStore hbase) {
+ rdbmsStore.set(rdbms);
+ hbaseStore.set(hbase);
+ rdbmsConf = rdbms.getConf();
+ hbaseConf = hbase.getConf();
+ }
- return new HBaseImport();
+ private static class PartQueueEntry {
+ final String dbName;
+ final String tableName;
+ final List<String> partNames;
+
+ PartQueueEntry(String d, String t, List<String> p) {
+ dbName = d;
+ tableName = t;
+ partNames = p;
+ }
}
}