You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2019/01/08 08:26:18 UTC
[3/3] hive git commit: HIVE-20911: External Table Replication for
Hive (Anishek Agarwal, reviewed by Sankar Hariappan, Ashutosh Bapat)
HIVE-20911: External Table Replication for Hive (Anishek Agarwal, reviewed by Sankar Hariappan, Ashutosh Bapat)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b3ef75ea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b3ef75ea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b3ef75ea
Branch: refs/heads/master
Commit: b3ef75eaa1e828f8c80d95ea7c32abcd1f000ef4
Parents: 0dbb896
Author: Anishek Agarwal <an...@gmail.com>
Authored: Tue Jan 8 13:56:02 2019 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Tue Jan 8 13:56:02 2019 +0530
----------------------------------------------------------------------
.../apache/hadoop/hive/common/FileUtils.java | 2 +
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../parse/BaseReplicationAcrossInstances.java | 83 ++++
.../TestReplTableMigrationWithJsonFormat.java | 29 ++
.../hive/ql/parse/TestReplicationScenarios.java | 4 +-
...TestReplicationScenariosAcrossInstances.java | 200 ++-------
.../TestReplicationScenariosExternalTables.java | 420 +++++++++++++++++++
...ationScenariosIncrementalLoadAcidTables.java | 3 -
.../TestReplicationScenariosMigration.java | 33 --
.../TestReplicationWithTableMigration.java | 235 +++++++----
.../hadoop/hive/ql/parse/WarehouseInstance.java | 81 +---
.../java/org/apache/hadoop/hive/ql/Context.java | 5 +-
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 5 +
.../exec/repl/ExternalTableCopyTaskBuilder.java | 150 +++++++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 96 +++--
.../hive/ql/exec/repl/ReplExternalTables.java | 272 ++++++++++++
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 53 ++-
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 19 +-
.../filesystem/BootstrapEventsIterator.java | 1 +
.../filesystem/DatabaseEventsIterator.java | 6 +
.../events/filesystem/FSTableEvent.java | 44 +-
.../bootstrap/load/table/LoadPartitions.java | 91 ++--
.../repl/bootstrap/load/table/LoadTable.java | 73 +++-
.../IncrementalLoadTasksBuilder.java | 15 +-
.../hive/ql/exec/repl/util/ReplUtils.java | 3 +-
.../apache/hadoop/hive/ql/metadata/Table.java | 2 +-
.../hive/ql/parse/BaseSemanticAnalyzer.java | 3 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 116 +++--
.../ql/parse/ReplicationSemanticAnalyzer.java | 34 +-
.../hadoop/hive/ql/parse/ReplicationSpec.java | 9 +
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 2 +-
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 7 +-
.../repl/dump/events/DropTableHandler.java | 3 +-
.../parse/repl/dump/events/InsertHandler.java | 4 +
.../parse/repl/dump/io/PartitionSerializer.java | 4 -
.../ql/parse/repl/dump/io/TableSerializer.java | 29 +-
.../hive/ql/parse/repl/load/MetadataJson.java | 10 +-
.../parse/repl/load/message/InsertHandler.java | 24 ++
.../parse/repl/load/message/MessageHandler.java | 4 +
.../parse/repl/load/message/TableHandler.java | 103 ++++-
.../hive/ql/exec/repl/TestReplDumpTask.java | 13 +-
.../queries/clientpositive/repl_2_exim_basic.q | 1 -
.../clientpositive/repl_2_exim_basic.q.out | 3 +-
.../ptest2/conf/deployed/master-mr2.properties | 2 +-
44 files changed, 1711 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 56748fd..23a3a6b 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -638,6 +638,8 @@ public final class FileUtils {
public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
boolean deleteSource, String doAsUser,
HiveConf conf, HadoopShims shims) throws IOException {
+ LOG.debug("copying srcPaths : {}, to DestPath :{} ,with doAs: {}",
+ StringUtils.join(",", srcPaths), dst.toString(), doAsUser);
boolean copied = false;
if (doAsUser == null){
copied = shims.runDistCp(srcPaths, dst, conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6a7c4ab..b213609 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -502,6 +502,10 @@ public class HiveConf extends Configuration {
+ " Schemes of the file system which does not support atomic move (rename) can be specified here to \n "
+ " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n"
+ " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."),
+ REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", "/",
+ "This is the base directory on the target/replica warehouse under which data for "
+ + "external tables is stored. This is relative base path and hence prefixed to the source "
+ + "external table path on target cluster."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
new file mode 100644
index 0000000..d321cca
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+
+public class BaseReplicationAcrossInstances {
+ @Rule
+ public final TestName testName = new TestName();
+
+ protected static final Logger LOG = LoggerFactory.getLogger(BaseReplicationAcrossInstances.class);
+ static WarehouseInstance primary;
+ static WarehouseInstance replica;
+ String primaryDbName, replicatedDbName;
+ static HiveConf conf;
+
+ static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
+ throws Exception {
+ conf = new HiveConf(clazz);
+ conf.set("dfs.client.use.datanode.hostname", "true");
+ conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+ MiniDFSCluster miniDFSCluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+ Map<String, String> localOverrides = new HashMap<String, String>() {{
+ put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+ put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+ }};
+ localOverrides.putAll(overrides);
+ primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+ }
+
+ @AfterClass
+ public static void classLevelTearDown() throws IOException {
+ primary.close();
+ replica.close();
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
+ replicatedDbName = "replicated_" + primaryDbName;
+ primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
+ SOURCE_OF_REPLICATION + "' = '1,2,3')");
+ }
+
+ @After
+ public void tearDown() throws Throwable {
+ primary.run("drop database if exists " + primaryDbName + " cascade");
+ replica.run("drop database if exists " + replicatedDbName + " cascade");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java
new file mode 100644
index 0000000..0151ed0
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.BeforeClass;
+
+import java.util.Collections;
+
+public class TestReplTableMigrationWithJsonFormat extends TestReplicationWithTableMigration {
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ internalBeforeClassSetup(Collections.emptyMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 98cbd97..c85a2a4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -91,6 +91,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -393,7 +394,8 @@ public class TestReplicationScenarios {
HiveConf confTemp = new HiveConf();
confTemp.set("hive.repl.enable.move.optimization", "true");
ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb,
- null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId));
+ null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
+ Collections.emptyList());
Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null);
replLoadTask.executeTask(null);
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index b50f9a8..0df99b3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -22,35 +22,29 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
-import org.apache.hadoop.hive.ql.util.DependencyResolver;
-import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.util.DependencyResolver;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import javax.annotation.Nullable;
import java.io.IOException;
-import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
@@ -59,8 +53,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -68,63 +62,17 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-
-public class TestReplicationScenariosAcrossInstances {
- @Rule
- public final TestName testName = new TestName();
-
- protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
- static WarehouseInstance primary;
- private static WarehouseInstance replica;
- private String primaryDbName, replicatedDbName;
- private static HiveConf conf;
+public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcrossInstances {
@BeforeClass
public static void classLevelSetup() throws Exception {
HashMap<String, String> overrides = new HashMap<>();
overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
+ UserGroupInformation.getCurrentUser().getUserName());
- internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
- }
-
- static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz)
- throws Exception {
- conf = new HiveConf(clazz);
- conf.set("dfs.client.use.datanode.hostname", "true");
- conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
- MiniDFSCluster miniDFSCluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- Map<String, String> localOverrides = new HashMap<String, String>() {{
- put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
- put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
- }};
- localOverrides.putAll(overrides);
- primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
- replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
- }
-
- @AfterClass
- public static void classLevelTearDown() throws IOException {
- primary.close();
- replica.close();
- }
-
- @Before
- public void setup() throws Throwable {
- primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis();
- replicatedDbName = "replicated_" + primaryDbName;
- primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
- SOURCE_OF_REPLICATION + "' = '1,2,3')");
- }
-
- @After
- public void tearDown() throws Throwable {
- primary.run("drop database if exists " + primaryDbName + " cascade");
- replica.run("drop database if exists " + replicatedDbName + " cascade");
+ internalBeforeClassSetup(overrides, TestReplicationScenariosAcrossInstances.class);
}
@Test
@@ -365,8 +313,10 @@ public class TestReplicationScenariosAcrossInstances {
.dump(primaryDbName, null);
// each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs.
- replica.hiveConf.setIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, 1);
- replica.load(replicatedDbName, tuple.dumpLocation)
+ List<String> withClause = Collections.singletonList(
+ "'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'");
+
+ replica.load(replicatedDbName, tuple.dumpLocation, withClause)
.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] { "t1", "t2", "t3" })
@@ -433,7 +383,8 @@ public class TestReplicationScenariosAcrossInstances {
.run("create table table2 (a int, city string) partitioned by (country string)")
.run("create table table3 (i int, j int)")
.run("insert into table1 values (1,2)")
- .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
+ .dump(primaryDbName, null,
+ Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
.run("use " + replicatedDbName)
@@ -1181,7 +1132,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("use " + importDbFromReplica)
.run("import table t1 from " + exportPath)
.run("select country from t1")
- .verifyResults(Arrays.asList("india"));
+ .verifyResults(Collections.singletonList("india"));
// Check if table/partition in C doesn't have ckpt property
t1 = replica.getTable(importDbFromReplica, "t1");
@@ -1568,107 +1519,6 @@ public class TestReplicationScenariosAcrossInstances {
.run(" drop database if exists " + replicatedDbName_CM + " cascade");
}
- @Test
- public void testDumpExternalTableSetFalse() throws Throwable {
- WarehouseInstance.Tuple tuple = primary
- .run("use " + primaryDbName)
- .run("create external table t1 (id int)")
- .run("insert into table t1 values (1)")
- .run("insert into table t1 values (2)")
- .run("create external table t2 (place string) partitioned by (country string)")
- .run("insert into table t2 partition(country='india') values ('bangalore')")
- .run("insert into table t2 partition(country='us') values ('austin')")
- .run("insert into table t2 partition(country='france') values ('paris')")
- .dump(primaryDbName, null);
-
- replica.load(replicatedDbName, tuple.dumpLocation)
- .run("repl status " + replicatedDbName)
- .verifyResult(tuple.lastReplicationId)
- .run("use " + replicatedDbName)
- .run("show tables like 't1'")
- .verifyFailure(new String[] {"t1"})
- .run("show tables like 't2'")
- .verifyFailure(new String[] {"t2"});
-
- tuple = primary.run("use " + primaryDbName)
- .run("create external table t3 (id int)")
- .run("insert into table t3 values (10)")
- .run("insert into table t3 values (20)")
- .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId
- + " with ('hive.repl.dump.metadata.only'='true')");
-
- replica.load(replicatedDbName, tuple.dumpLocation)
- .run("use " + replicatedDbName)
- .run("show tables like 't3'")
- .verifyResult("t3")
- .run("select id from t3 where id = 10")
- .verifyFailure(new String[] {"10"});
- }
-
- @Test
- public void testDumpExternalTableSetTrue() throws Throwable {
- WarehouseInstance.Tuple tuple = primary
- .run("use " + primaryDbName)
- .run("create external table t1 (id int)")
- .run("insert into table t1 values (1)")
- .run("insert into table t1 values (2)")
- .run("create external table t2 (place string) partitioned by (country string)")
- .run("insert into table t2 partition(country='india') values ('bangalore')")
- .run("insert into table t2 partition(country='us') values ('austin')")
- .run("insert into table t2 partition(country='france') values ('paris')")
- .dump("repl dump " + primaryDbName + " with ('hive.repl.include.external.tables'='true')");
-
- replica.load(replicatedDbName, tuple.dumpLocation)
- .run("use " + replicatedDbName)
- .run("show tables like 't1'")
- .verifyResult("t1")
- .run("show tables like 't2'")
- .verifyResult("t2")
- .run("repl status " + replicatedDbName)
- .verifyResult(tuple.lastReplicationId)
- .run("select country from t2 where country = 'us'")
- .verifyResult("us")
- .run("select country from t2 where country = 'france'")
- .verifyResult("france");
-
- tuple = primary.run("use " + primaryDbName)
- .run("create external table t3 (id int)")
- .run("insert into table t3 values (10)")
- .run("create external table t4 as select id from t3")
- .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId
- + " with ('hive.repl.include.external.tables'='true')");
-
- replica.load(replicatedDbName, tuple.dumpLocation)
- .run("use " + replicatedDbName)
- .run("show tables like 't3'")
- .verifyResult("t3")
- .run("select id from t3")
- .verifyResult("10")
- .run("select id from t4")
- .verifyResult(null); // Returns null as create table event doesn't list files
- }
-
- @Test
- public void testDumpExternalTableWithAddPartitionEvent() throws Throwable {
- WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName);
-
- replica.load(replicatedDbName, tuple.dumpLocation);
-
- tuple = primary.run("use " + primaryDbName)
- .run("create external table t1 (place string) partitioned by (country string)")
- .run("alter table t1 add partition(country='india')")
- .run("alter table t1 add partition(country='us')")
- .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId
- + " with ('hive.repl.include.external.tables'='true')");
-
- replica.load(replicatedDbName, tuple.dumpLocation)
- .run("use " + replicatedDbName)
- .run("show tables like 't1'")
- .verifyResult("t1")
- .run("show partitions t1")
- .verifyResults(new String[] { "country=india", "country=us" });
- }
-
// This requires the tables are loaded in a fixed sorted order.
@Test
public void testBootstrapLoadRetryAfterFailureForAlterTable() throws Throwable {
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
new file mode 100644
index 0000000..0e3cefc
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestReplicationScenariosExternalTables extends BaseReplicationAcrossInstances {
+
+ private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base";
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ HashMap<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+ overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true");
+ overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
+ UserGroupInformation.getCurrentUser().getUserName());
+
+ internalBeforeClassSetup(overrides, TestReplicationScenarios.class);
+ }
+
+ @Test
+ public void replicationWithoutExternalTables() throws Throwable {
+ List<String> loadWithClause = externalTableBasePathWithClause();
+ List<String> dumpWithClause = Collections.singletonList
+ ("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'");
+
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2)")
+ .run("create external table t2 (place string) partitioned by (country string)")
+ .run("insert into table t2 partition(country='india') values ('bangalore')")
+ .run("insert into table t2 partition(country='us') values ('austin')")
+ .run("insert into table t2 partition(country='france') values ('paris')")
+ .dump(primaryDbName, null, dumpWithClause);
+
+ // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+ assertFalse(primary.miniDFSCluster.getFileSystem()
+ .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyFailure(new String[] { "t1" })
+ .run("show tables like 't2'")
+ .verifyFailure(new String[] { "t2" });
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("create external table t3 (id int)")
+ .run("insert into table t3 values (10)")
+ .run("insert into table t3 values (20)")
+ .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause);
+
+ // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+ assertFalse(primary.miniDFSCluster.getFileSystem()
+ .exists(new Path(tuple.dumpLocation, FILE_NAME)));
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't3'")
+ .verifyFailure(new String[] { "t3" });
+ }
+
+ @Test
+ public void externalTableReplicationWithDefaultPaths() throws Throwable {
+ //creates external tables with partitions
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2)")
+ .run("create external table t2 (place string) partitioned by (country string)")
+ .run("insert into table t2 partition(country='india') values ('bangalore')")
+ .run("insert into table t2 partition(country='us') values ('austin')")
+ .run("insert into table t2 partition(country='france') values ('paris')")
+ .dump("repl dump " + primaryDbName);
+
+ // verify that the external table info is written correctly for bootstrap
+ assertExternalFileInfo(Arrays.asList("t1", "t2"),
+ new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
+
+ List<String> withClauseOptions = externalTableBasePathWithClause();
+
+ replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("select country from t2 where country = 'us'")
+ .verifyResult("us")
+ .run("select country from t2 where country = 'france'")
+ .verifyResult("france");
+
+ assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1");
+ assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("create external table t3 (id int)")
+ .run("insert into table t3 values (10)")
+ .run("create external table t4 as select id from t3")
+ .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId);
+
+ // verify that the external table info is written correctly for incremental
+ assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"),
+ new Path(tuple.dumpLocation, FILE_NAME));
+
+ replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't3'")
+ .verifyResult("t3")
+ .run("select id from t3")
+ .verifyResult("10")
+ .run("select id from t4")
+ .verifyResult("10");
+
+ assertTablePartitionLocation(primaryDbName + ".t3", replicatedDbName + ".t3");
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("drop table t1")
+ .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId);
+
+ // verify that the external table info is written correctly for incremental
+ assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"),
+ new Path(tuple.dumpLocation, FILE_NAME));
+ }
+
+ /**
+ * @param sourceTableName -- Provide the fully qualified table name
+ * @param replicaTableName -- Provide the fully qualified table name
+ */
+ private void assertTablePartitionLocation(String sourceTableName, String replicaTableName)
+ throws HiveException {
+ Hive hiveForPrimary = Hive.get(primary.hiveConf);
+ Table sourceTable = hiveForPrimary.getTable(sourceTableName);
+ Path sourceLocation = sourceTable.getDataLocation();
+ Hive hiveForReplica = Hive.get(replica.hiveConf);
+ Table replicaTable = hiveForReplica.getTable(replicaTableName);
+ Path dataLocation = replicaTable.getDataLocation();
+ assertEquals(REPLICA_EXTERNAL_BASE + sourceLocation.toUri().getPath(),
+ dataLocation.toUri().getPath());
+ if (sourceTable.isPartitioned()) {
+ Set<Partition> sourcePartitions = hiveForPrimary.getAllPartitionsOf(sourceTable);
+ Set<Partition> replicaPartitions = hiveForReplica.getAllPartitionsOf(replicaTable);
+ assertEquals(sourcePartitions.size(), replicaPartitions.size());
+ List<String> expectedPaths =
+ sourcePartitions.stream()
+ .map(p -> REPLICA_EXTERNAL_BASE + p.getDataLocation().toUri().getPath())
+ .collect(Collectors.toList());
+ List<String> actualPaths =
+ replicaPartitions.stream()
+ .map(p -> p.getDataLocation().toUri().getPath())
+ .collect(Collectors.toList());
+ assertTrue(expectedPaths.containsAll(actualPaths));
+ }
+ }
+
+ @Test
+ public void externalTableReplicationWithCustomPaths() throws Throwable {
+ Path externalTableLocation =
+ new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
+ DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+ fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+ List<String> loadWithClause = externalTableBasePathWithClause();
+
+ WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName)
+ .run("create external table a (i int, j int) "
+ + "row format delimited fields terminated by ',' "
+ + "location '" + externalTableLocation.toUri() + "'")
+ .dump(primaryDbName, null);
+
+ replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("show tables like 'a'")
+ .verifyResults(Collections.singletonList("a"))
+ .run("select * From a").verifyResults(Collections.emptyList());
+
+ assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
+
+ //externally add data to location
+ try (FSDataOutputStream outputStream =
+ fs.create(new Path(externalTableLocation, "file1.txt"))) {
+ outputStream.write("1,2\n".getBytes());
+ outputStream.write("13,21\n".getBytes());
+ }
+
+ WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
+ .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+ replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+ .run("select i From a")
+ .verifyResults(new String[] { "1", "13" })
+ .run("select j from a")
+ .verifyResults(new String[] { "2", "21" });
+
+ // alter table location to something new.
+ externalTableLocation =
+ new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/");
+ incrementalTuple = primary.run("use " + primaryDbName)
+ .run("alter table a set location '" + externalTableLocation + "'")
+ .dump(primaryDbName, incrementalTuple.lastReplicationId);
+
+ replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("select i From a")
+ .verifyResults(Collections.emptyList());
+ assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
+ }
+
+ @Test
+ public void externalTableWithPartitions() throws Throwable {
+ Path externalTableLocation =
+ new Path("/" + testName.getMethodName() + "/t2/");
+ DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+ fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+ List<String> loadWithClause = externalTableBasePathWithClause();
+
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t2 (place string) partitioned by (country string) row format "
+ + "delimited fields terminated by ',' location '" + externalTableLocation.toString()
+ + "'")
+ .run("insert into t2 partition(country='india') values ('bangalore')")
+ .dump("repl dump " + primaryDbName);
+
+ assertExternalFileInfo(Collections.singletonList("t2"),
+ new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't2'")
+ .verifyResults(new String[] { "t2" })
+ .run("select place from t2")
+ .verifyResults(new String[] { "bangalore" });
+
+ assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
+
+ // add new data externally, to a partition, but under the table level top directory
+ Path partitionDir = new Path(externalTableLocation, "country=india");
+ try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) {
+ outputStream.write("pune\n".getBytes());
+ outputStream.write("mumbai\n".getBytes());
+ }
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("insert into t2 partition(country='australia') values ('sydney')")
+ .dump(primaryDbName, tuple.lastReplicationId);
+
+ assertExternalFileInfo(Collections.singletonList("t2"),
+ new Path(tuple.dumpLocation, FILE_NAME));
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("select distinct(country) from t2")
+ .verifyResults(new String[] { "india", "australia" })
+ .run("select place from t2 where country='india'")
+ .verifyResults(new String[] { "bangalore", "pune", "mumbai" })
+ .run("select place from t2 where country='australia'")
+ .verifyResults(new String[] { "sydney" });
+
+ Path customPartitionLocation =
+ new Path("/" + testName.getMethodName() + "/partition_data/t2/country=france");
+ fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+ // add new partitions to the table, at an external location than the table level directory
+ try (FSDataOutputStream outputStream = fs
+ .create(new Path(customPartitionLocation, "file.txt"))) {
+ outputStream.write("paris".getBytes());
+ }
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation
+ .toString() + "'")
+ .dump(primaryDbName, tuple.lastReplicationId);
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("select place from t2 where country='france'")
+ .verifyResults(new String[] { "paris" });
+
+ // change the location of the partition via alter command
+ String tmpLocation = "/tmp/" + System.nanoTime();
+ primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777"));
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'")
+ .dump(primaryDbName, tuple.lastReplicationId);
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("select place from t2 where country='france'")
+ .verifyResults(new String[] {});
+ }
+
+ @Test
+ public void externalTableIncrementalReplication() throws Throwable {
+ WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName);
+ replica.load(replicatedDbName, tuple.dumpLocation);
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (place string) partitioned by (country string)")
+ .run("alter table t1 add partition(country='india')")
+ .run("alter table t1 add partition(country='us')")
+ .dump(primaryDbName, tuple.lastReplicationId);
+
+ List<String> loadWithClause = externalTableBasePathWithClause();
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show partitions t1")
+ .verifyResults(new String[] { "country=india", "country=us" });
+
+ Hive hive = Hive.get(replica.getConf());
+ Set<Partition> partitions =
+ hive.getAllPartitionsOf(hive.getTable(replicatedDbName + ".t1"));
+ List<String> paths = partitions.stream().map(p -> p.getDataLocation().toUri().getPath())
+ .collect(Collectors.toList());
+
+ tuple = primary
+ .run("alter table t1 drop partition (country='india')")
+ .run("alter table t1 drop partition (country='us')")
+ .dump(primaryDbName, tuple.lastReplicationId);
+
+ replica.load(replicatedDbName, tuple.dumpLocation)
+ .run("select * From t1")
+ .verifyResults(new String[] {});
+
+ for (String path : paths) {
+ assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path)));
+ }
+
+ }
+
+ private List<String> externalTableBasePathWithClause() throws IOException, SemanticException {
+ Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE);
+ DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem();
+ externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem);
+ fileSystem.mkdirs(externalTableLocation);
+
+ // this is required since the same filesystem is used in both source and target
+ return Collections.singletonList(
+ "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+ + externalTableLocation.toString() + "'"
+ );
+ }
+
+ private void assertExternalFileInfo(List<String> expected, Path externalTableInfoFile)
+ throws IOException {
+ DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem();
+ assertTrue(fileSystem.exists(externalTableInfoFile));
+ InputStream inputStream = fileSystem.open(externalTableInfoFile);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+ Set<String> tableNames = new HashSet<>();
+ for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+ String[] components = line.split(",");
+ assertEquals("The file should have tableName,base64encoded(data_location)",
+ 2, components.length);
+ tableNames.add(components[0]);
+ assertTrue(components[1].length() > 0);
+ }
+ assertTrue(expected.containsAll(tableNames));
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
index 97775b3..5529d9e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
@@ -22,10 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.ql.parse.WarehouseInstance;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils;
-
import org.junit.rules.TestName;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java
deleted file mode 100644
index 5b8e424..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.parse;
-
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
-import java.util.HashMap;
-import org.junit.BeforeClass;
-
-public class TestReplicationScenariosMigration extends org.apache.hadoop.hive.ql.parse.TestReplicationScenarios {
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- HashMap<String, String> overrideProperties = new HashMap<>();
- overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
- GzipJSONMessageEncoder.class.getCanonicalName());
- internalBeforeClassSetup(overrideProperties, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
index ec64f4b..58561d4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
@@ -17,19 +17,24 @@
*/
package org.apache.hadoop.hive.ql.parse;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.hive.ql.parse.WarehouseInstance;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +52,6 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.junit.rules.TestName;
-import com.google.common.collect.Lists;
import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable;
import static org.junit.Assert.assertEquals;
@@ -58,38 +62,48 @@ import static org.junit.Assert.assertTrue;
* TestReplicationWithTableMigration - test replication for Hive2 to Hive3 (Strict managed tables)
*/
public class TestReplicationWithTableMigration {
+ private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc";
+
@Rule
public final TestName testName = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigration.class);
private static WarehouseInstance primary, replica;
private String primaryDbName, replicatedDbName;
- private static HiveConf conf;
+ private Path avroSchemaFile = null;
@BeforeClass
public static void classLevelSetup() throws Exception {
- conf = new HiveConf(TestReplicationWithTableMigration.class);
+ HashMap<String, String> overrideProperties = new HashMap<>();
+ overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+ internalBeforeClassSetup(overrideProperties);
+ }
+
+ static void internalBeforeClassSetup(Map<String, String> overrideConfigs) throws Exception {
+ HiveConf conf = new HiveConf(TestReplicationWithTableMigration.class);
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
MiniDFSCluster miniDFSCluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
- HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{
- put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
- put("hive.support.concurrency", "true");
- put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- put("hive.metastore.client.capability.check", "false");
- put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
- put("hive.exec.dynamic.partition.mode", "nonstrict");
- put("hive.strict.checks.bucketing", "false");
- put("hive.mapred.mode", "nonstrict");
- put("mapred.input.dir.recursive", "true");
- put("hive.metastore.disallow.incompatible.col.type.changes", "false");
- put("hive.strict.managed.tables", "true");
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+ final DistributedFileSystem fs = miniDFSCluster.getFileSystem();
+ HashMap<String, String> hiveConfigs = new HashMap<String, String>() {{
+ put("fs.defaultFS", fs.getUri().toString());
+ put("hive.support.concurrency", "true");
+ put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+ put("hive.metastore.client.capability.check", "false");
+ put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ put("hive.exec.dynamic.partition.mode", "nonstrict");
+ put("hive.strict.checks.bucketing", "false");
+ put("hive.mapred.mode", "nonstrict");
+ put("mapred.input.dir.recursive", "true");
+ put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+ put("hive.strict.managed.tables", "true");
}};
- replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+ replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
- HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
- put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+ HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
+ put("fs.defaultFS", fs.getUri().toString());
put("hive.metastore.client.capability.check", "false");
put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
put("hive.exec.dynamic.partition.mode", "nonstrict");
@@ -101,7 +115,40 @@ public class TestReplicationWithTableMigration {
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
put("hive.strict.managed.tables", "false");
}};
- primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
+ configsForPrimary.putAll(overrideConfigs);
+ primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
+ }
+
+ private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException {
+ Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME);
+ String[] schemaVals = new String[] { "{",
+ " \"type\" : \"record\",",
+ " \"name\" : \"table1\",",
+ " \"doc\" : \"Sqoop import of table1\",",
+ " \"fields\" : [ {",
+ " \"name\" : \"col1\",",
+ " \"type\" : [ \"null\", \"string\" ],",
+ " \"default\" : null,",
+ " \"columnName\" : \"col1\",",
+ " \"sqlType\" : \"12\"",
+ " }, {",
+ " \"name\" : \"col2\",",
+ " \"type\" : [ \"null\", \"long\" ],",
+ " \"default\" : null,",
+ " \"columnName\" : \"col2\",",
+ " \"sqlType\" : \"13\"",
+ " } ],",
+ " \"tableName\" : \"table1\"",
+ "}"
+ };
+
+ try (FSDataOutputStream stream = fs.create(schemaFile)) {
+ for (String line : schemaVals) {
+ stream.write((line + "\n").getBytes());
+ }
+ }
+ fs.deleteOnExit(schemaFile);
+ return schemaFile;
}
@AfterClass
@@ -116,6 +163,12 @@ public class TestReplicationWithTableMigration {
replicatedDbName = "replicated_" + primaryDbName;
primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
SOURCE_OF_REPLICATION + "' = '1,2,3')");
+ if (avroSchemaFile == null) {
+ Path testPath = new Path("/tmp/avro_schema/definition/" + System.nanoTime());
+ DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+ fs.mkdirs(testPath, new FsPermission("777"));
+ avroSchemaFile = PathBuilder.fullyQualifiedHDFSUri(createAvroSchemaFile(fs, testPath), fs);
+ }
}
@After
@@ -125,39 +178,52 @@ public class TestReplicationWithTableMigration {
}
private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable {
- WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
- .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
- .run("insert into tacid values(1)")
- .run("insert into tacid values(2)")
- .run("insert into tacid values(3)")
- .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " +
- "into 3 buckets stored as orc ")
- .run("alter table tacidpart add partition(country='france')")
- .run("insert into tacidpart partition(country='india') values('mumbai')")
- .run("insert into tacidpart partition(country='us') values('sf')")
- .run("insert into tacidpart partition(country='france') values('paris')")
- .run("create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")")
- .run("insert into tflat values(11)")
- .run("insert into tflat values(22)")
- .run("create table tflattext (id int) ")
- .run("insert into tflattext values(111), (222)")
- .run("create table tflattextpart (id int) partitioned by (country string) ")
- .run("insert into tflattextpart partition(country='india') values(1111), (2222)")
- .run("insert into tflattextpart partition(country='us') values(3333)")
- .run("create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp' ")
- .run("insert into tacidloc values(1)")
- .run("insert into tacidloc values(2)")
- .run("insert into tacidloc values(3)")
- .run("create table tacidpartloc (place string) partitioned by (country string) clustered by(place) " +
- "into 3 buckets stored as orc ")
- .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/part'")
- .run("insert into tacidpartloc partition(country='india') values('mumbai')")
- .run("insert into tacidpartloc partition(country='us') values('sf')")
- .run("insert into tacidpartloc partition(country='france') values('paris')")
- .run("create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " +
- "stored as avro tblproperties ('avro.schema.url'='" + primary.avroSchemaFile.toUri().toString() + "')")
- .run("insert into avro_table values('str1', 10)")
- .dump(primaryDbName, fromReplId);
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ")
+ .run("insert into tacid values(1)")
+ .run("insert into tacid values(2)")
+ .run("insert into tacid values(3)")
+ .run(
+ "create table tacidpart (place string) partitioned by (country string) clustered by(place) "
+ +
+ "into 3 buckets stored as orc ")
+ .run("alter table tacidpart add partition(country='france')")
+ .run("insert into tacidpart partition(country='india') values('mumbai')")
+ .run("insert into tacidpart partition(country='us') values('sf')")
+ .run("insert into tacidpart partition(country='france') values('paris')")
+ .run(
+ "create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")")
+ .run("insert into tflat values(11)")
+ .run("insert into tflat values(22)")
+ .run("create table tflattext (id int) ")
+ .run("insert into tflattext values(111), (222)")
+ .run("create table tflattextpart (id int) partitioned by (country string) ")
+ .run("insert into tflattextpart partition(country='india') values(1111), (2222)")
+ .run("insert into tflattextpart partition(country='us') values(3333)")
+ .run(
+ "create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp' ")
+ .run("insert into tacidloc values(1)")
+ .run("insert into tacidloc values(2)")
+ .run("insert into tacidloc values(3)")
+ .run(
+ "create table tacidpartloc (place string) partitioned by (country string) clustered by(place) "
+ +
+ "into 3 buckets stored as orc ")
+ .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/part'")
+ .run("insert into tacidpartloc partition(country='india') values('mumbai')")
+ .run("insert into tacidpartloc partition(country='us') values('sf')")
+ .run("insert into tacidpartloc partition(country='france') values('paris')")
+ .run(
+ "create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' "
+ + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri()
+ .toString() + "')")
+ .run("insert into avro_table values ('str1', 10)")
+ .run(
+ "create table avro_table_part partitioned by (country string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' "
+ + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri()
+ .toString() + "')")
+ .run("insert into avro_table_part partition (country='india') values ('another', 13)")
+ .dump(primaryDbName, fromReplId);
assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid")));
assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart")));
assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflat")));
@@ -165,17 +231,24 @@ public class TestReplicationWithTableMigration {
assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattextpart")));
assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidloc")));
assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpartloc")));
- Table avroTable = primary.getTable(primaryDbName, "avro_table");
- assertFalse(isTransactionalTable(avroTable));
- assertFalse(MetaStoreUtils.isExternalTable(avroTable));
+ assertAvroTableState(primaryDbName, "avro_table", "avro_table_part");
+ assertAvroTableState(primaryDbName, "avro_table_part");
return tuple;
}
+ private void assertAvroTableState(String primaryDbName, String... tableNames) throws Exception {
+ for (String tableName : tableNames) {
+ Table avroTable = primary.getTable(primaryDbName, tableName);
+ assertFalse(isTransactionalTable(avroTable));
+ assertFalse(MetaStoreUtils.isExternalTable(avroTable));
+ }
+ }
+
private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable {
replica.run("use " + replicatedDbName)
.run("show tables")
.verifyResults(new String[] {"tacid", "tacidpart", "tflat", "tflattext", "tflattextpart",
- "tacidloc", "tacidpartloc", "avro_table"})
+ "tacidloc", "tacidpartloc", "avro_table", "avro_table_part" })
.run("repl status " + replicatedDbName)
.verifyResult(lastReplId)
.run("select id from tacid order by id")
@@ -193,7 +266,9 @@ public class TestReplicationWithTableMigration {
.run("select country from tacidpartloc order by country")
.verifyResults(new String[] {"france", "india", "us"})
.run("select col1 from avro_table")
- .verifyResults(new String[] {"str1"});
+ .verifyResults(new String[] { "str1" })
+ .run("select col1 from avro_table_part")
+ .verifyResults(new String[] { "another" });
assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid")));
assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart")));
@@ -204,23 +279,29 @@ public class TestReplicationWithTableMigration {
assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattextpart")));
assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidloc")));
assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpartloc")));
+ assertTablePath(replicatedDbName, "avro_table");
+ assertPartitionPath(replicatedDbName, "avro_table_part");
+ }
- /*Path databasePath = new Path(replica.warehouseRoot, replica.getDatabase(replicatedDbName).getLocationUri());
- assertEquals(replica.getTable(replicatedDbName, "tacidloc").getSd().getLocation(),
- new Path(databasePath,"tacidloc").toUri().toString());
-
- Path tablePath = new Path(databasePath, "tacidpartloc");
- List<Partition> partitions = replica.getAllPartitions(replicatedDbName, "tacidpartloc");
- for (Partition part : partitions) {
- tablePath.equals(new Path(part.getSd().getLocation()).getParent());
- }*/
+ private void assertPartitionPath(String replicatedDbName, String tableName) throws Exception {
+ Path tablePath = assertTablePath(replicatedDbName, tableName);
+ List<Partition> partitions = replica.getAllPartitions(replicatedDbName, tableName);
+ assertEquals(1, partitions.size());
+ String actualPartitionPath = partitions.iterator().next().getSd().getLocation().toLowerCase();
+ String expectedPartitionPath = new PathBuilder(tablePath.toString())
+ .addDescendant("country=india").build().toUri().toString().toLowerCase();
+ assertEquals(expectedPartitionPath, actualPartitionPath);
+ }
- Table avroTable = replica.getTable(replicatedDbName, "avro_table");
+ private Path assertTablePath(String replicatedDbName, String tableName) throws Exception {
+ Table avroTable = replica.getTable(replicatedDbName, tableName);
assertTrue(MetaStoreUtils.isExternalTable(avroTable));
- Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString()).addDescendant(replicatedDbName + ".db")
- .addDescendant("avro_table")
- .build();
- assertEquals(avroTable.getSd().getLocation().toLowerCase(), tablePath.toUri().toString().toLowerCase());
+ Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString())
+ .addDescendant(replicatedDbName + ".db").addDescendant(tableName).build();
+ String expectedTablePath = tablePath.toUri().toString().toLowerCase();
+ String actualTablePath = avroTable.getSd().getLocation().toLowerCase();
+ assertEquals(expectedTablePath, actualTablePath);
+ return tablePath;
}
private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable {
@@ -231,12 +312,12 @@ public class TestReplicationWithTableMigration {
public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) {
injectionPathCalled = true;
if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) {
- LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
- + " Constraint Table: " + String.valueOf(args.constraintTblName));
+ LOG.warn("Verifier - DB: " + args.dbName
+ + " Constraint Table: " + args.constraintTblName);
return false;
}
if (args.tblName != null) {
- LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
+ LOG.warn("Verifier - Table: " + args.tblName);
return args.tblName.equalsIgnoreCase(tbl);
}
return true;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 92f2456..bf4154c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -55,11 +55,8 @@ import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
import org.apache.hive.hcatalog.listener.DbNotificationListener;
import org.codehaus.plexus.util.ExceptionUtils;
import org.slf4j.Logger;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import java.io.Closeable;
-import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -75,20 +72,18 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class WarehouseInstance implements Closeable {
- final String functionsRoot;
+ final String functionsRoot, repldDir;
private Logger logger;
private IDriver driver;
HiveConf hiveConf;
MiniDFSCluster miniDFSCluster;
private HiveMetaStoreClient client;
- public final Path warehouseRoot;
- public final Path externalTableWarehouseRoot;
- public Path avroSchemaFile;
+ final Path warehouseRoot;
+ final Path externalTableWarehouseRoot;
private static int uniqueIdentifier = 0;
private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName();
- private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc";
WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf,
String keyNameForEncryptedZone) throws Exception {
@@ -106,8 +101,14 @@ public class WarehouseInstance implements Closeable {
}
Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
- initialize(cmRootPath.toString(), warehouseRoot.toString(), externalTableWarehouseRoot.toString(),
- overridesForHiveConf);
+ String tmpDir = "/tmp/"
+ + TestReplicationScenarios.class.getCanonicalName().replace('.', '_')
+ + "_"
+ + System.nanoTime();
+
+ this.repldDir = mkDir(fs, tmpDir + "/hrepl" + uniqueIdentifier + "/").toString();
+ initialize(cmRootPath.toString(), externalTableWarehouseRoot.toString(),
+ warehouseRoot.toString(), overridesForHiveConf);
}
WarehouseInstance(Logger logger, MiniDFSCluster cluster,
@@ -115,18 +116,13 @@ public class WarehouseInstance implements Closeable {
this(logger, cluster, overridesForHiveConf, null);
}
- private void initialize(String cmRoot, String warehouseRoot, String externalTableWarehouseRoot,
+ private void initialize(String cmRoot, String externalTableWarehouseRoot, String warehouseRoot,
Map<String, String> overridesForHiveConf) throws Exception {
hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class);
for (Map.Entry<String, String> entry : overridesForHiveConf.entrySet()) {
hiveConf.set(entry.getKey(), entry.getValue());
}
String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname);
- String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp")
- + Path.SEPARATOR
- + TestReplicationScenarios.class.getCanonicalName().replace('.', '_')
- + "_"
- + System.nanoTime();
if (metaStoreUri != null) {
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
return;
@@ -143,8 +139,7 @@ public class WarehouseInstance implements Closeable {
hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot);
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,
"jdbc:derby:memory:${test.tmp.dir}/APP;create=true");
- hiveConf.setVar(HiveConf.ConfVars.REPLDIR,
- hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/");
+ hiveConf.setVar(HiveConf.ConfVars.REPLDIR, this.repldDir);
hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
@@ -158,11 +153,6 @@ public class WarehouseInstance implements Closeable {
MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true);
- Path testPath = new Path(hiveWarehouseLocation);
- FileSystem testPathFileSystem = FileSystem.get(testPath.toUri(), hiveConf);
- testPathFileSystem.mkdirs(testPath);
-
- avroSchemaFile = createAvroSchemaFile(testPathFileSystem, testPath);
driver = DriverFactory.newDriver(hiveConf);
SessionState.start(new CliSessionState(hiveConf));
client = new HiveMetaStoreClient(hiveConf);
@@ -177,53 +167,10 @@ public class WarehouseInstance implements Closeable {
private Path mkDir(DistributedFileSystem fs, String pathString)
throws IOException, SemanticException {
Path path = new Path(pathString);
- fs.mkdir(path, new FsPermission("777"));
+ fs.mkdirs(path, new FsPermission("777"));
return PathBuilder.fullyQualifiedHDFSUri(path, fs);
}
- private Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException {
- Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME);
- String[] schemaVals = new String[] { "{",
- " \"type\" : \"record\",",
- " \"name\" : \"table1\",",
- " \"doc\" : \"Sqoop import of table1\",",
- " \"fields\" : [ {",
- " \"name\" : \"col1\",",
- " \"type\" : [ \"null\", \"string\" ],",
- " \"default\" : null,",
- " \"columnName\" : \"col1\",",
- " \"sqlType\" : \"12\"",
- " }, {",
- " \"name\" : \"col2\",",
- " \"type\" : [ \"null\", \"long\" ],",
- " \"default\" : null,",
- " \"columnName\" : \"col2\",",
- " \"sqlType\" : \"13\"",
- " } ],",
- " \"tableName\" : \"table1\"",
- "}"
- };
- createTestDataFile(schemaFile.toUri().getPath(), schemaVals);
- return schemaFile;
- }
-
- private void createTestDataFile(String filename, String[] lines) throws IOException {
- FileWriter writer = null;
- try {
- File file = new File(filename);
- file.deleteOnExit();
- writer = new FileWriter(file);
- int i=0;
- for (String line : lines) {
- writer.write(line + "\n");
- }
- } finally {
- if (writer != null) {
- writer.close();
- }
- }
- }
-
public HiveConf getConf() {
return hiveConf;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index aabc34d..18089d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -702,14 +702,13 @@ public class Context {
*/
public Path getExternalTmpPath(Path path) {
URI extURI = path.toUri();
- if (extURI.getScheme().equals("viewfs")) {
+ if ("viewfs".equals(extURI.getScheme())) {
// if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/..
// to final /user/hive/warehouse/ will fail later, so instead pick tmp dir
// on same namespace as tbl dir.
return getExtTmpPathRelTo(path.getParent());
}
- return new Path(getExternalScratchDir(extURI), EXT_PREFIX +
- nextPathId());
+ return new Path(getExternalScratchDir(extURI), EXT_PREFIX + nextPathId());
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 47a802f..40cc576 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -56,6 +56,10 @@ import org.apache.hadoop.hive.ql.plan.TezWork;
import com.google.common.annotations.VisibleForTesting;
+import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
+import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyTask;
+
+
/**
* TaskFactory implementation.
**/
@@ -113,6 +117,7 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class));
taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class));
taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class));
+ taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class));
}
private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
new file mode 100644
index 0000000..efecdb8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class ExternalTableCopyTaskBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(ExternalTableCopyTaskBuilder.class);
+ private final ReplLoadWork work;
+ private final HiveConf conf;
+
+ ExternalTableCopyTaskBuilder(ReplLoadWork work, HiveConf conf) {
+ this.work = work;
+ this.conf = conf;
+ }
+
+ List<Task<? extends Serializable>> tasks(TaskTracker tracker) {
+ List<Task<? extends Serializable>> tasks = new ArrayList<>();
+ Iterator<DirCopyWork> itr = work.getPathsToCopyIterator();
+ while (tracker.canAddMoreTasks() && itr.hasNext()) {
+ DirCopyWork dirCopyWork = itr.next();
+ Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
+ tasks.add(task);
+ tracker.addTask(task);
+ LOG.debug("added task for {}", dirCopyWork);
+ }
+ return tasks;
+ }
+
+ public static class DirCopyTask extends Task<DirCopyWork> implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
+ private static final int MAX_COPY_RETRY = 5;
+
+ @Override
+ protected int execute(DriverContext driverContext) {
+ String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+
+ Path sourcePath = work.fullyQualifiedSourcePath;
+ Path targetPath = work.fullyQualifiedTargetPath;
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
+ sourcePath = reservedRawPath(work.fullyQualifiedSourcePath.toUri());
+ targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri());
+ }
+ int currentRetry = 0;
+ while (currentRetry < MAX_COPY_RETRY) {
+ try {
+ UserGroupInformation ugi = Utils.getUGI();
+ String currentUser = ugi.getShortUserName();
+ boolean usePrivilegedUser =
+ distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser);
+
+ // do we create a new conf and only here provide this additional option so that we get away from
+ // differences of data in two location for the same directories ?
+ // basically add distcp.options.delete to hiveconf new object ?
+ FileUtils.distCp(
+ sourcePath.getFileSystem(conf), // source file system
+ Collections.singletonList(sourcePath), // list of source paths
+ targetPath,
+ false,
+ usePrivilegedUser ? distCpDoAsUser : null,
+ conf,
+ ShimLoader.getHadoopShims());
+ return 0;
+ } catch (Exception e) {
+ if (++currentRetry < MAX_COPY_RETRY) {
+ LOG.warn("unable to copy", e);
+ } else {
+ LOG.error("unable to copy {} to {}", sourcePath, targetPath, e);
+ setException(e);
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+ }
+ }
+ LOG.error("should never come here ");
+ return -1;
+ }
+
+ private static Path reservedRawPath(URI uri) {
+ return new Path(uri.getScheme(), uri.getAuthority(),
+ CopyUtils.RAW_RESERVED_VIRTUAL_PATH + uri.getPath());
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.REPL_INCREMENTAL_LOAD;
+ }
+
+ @Override
+ public String getName() {
+ return "DIR_COPY_TASK";
+ }
+ }
+
+ @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER,
+ Explain.Level.DEFAULT,
+ Explain.Level.EXTENDED })
+ public static class DirCopyWork implements Serializable {
+ private final Path fullyQualifiedSourcePath, fullyQualifiedTargetPath;
+
+ public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) {
+ this.fullyQualifiedSourcePath = fullyQualifiedSourcePath;
+ this.fullyQualifiedTargetPath = fullyQualifiedTargetPath;
+ }
+
+ @Override
+ public String toString() {
+ return "DirCopyWork{" +
+ "fullyQualifiedSourcePath=" + fullyQualifiedSourcePath +
+ ", fullyQualifiedTargetPath=" + fullyQualifiedTargetPath +
+ '}';
+ }
+ }
+}