You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/02/15 04:27:51 UTC
[hive] branch master updated: HIVE-21269: Mandate -update and
-delete as DistCp options to sync data files for external tables
replication (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 02d3551 HIVE-21269: Mandate -update and -delete as DistCp options to sync data files for external tables replication (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
02d3551 is described below
commit 02d35510b93068aa42020fe756eeec9496880959
Author: Sankar Hariappan <sa...@apache.org>
AuthorDate: Fri Feb 15 09:56:28 2019 +0530
HIVE-21269: Mandate -update and -delete as DistCp options to sync data files for external tables replication (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
.../TestReplicationScenariosExternalTables.java | 60 +++++++++++++++++++---
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 25 ++++++---
.../hadoop/hive/shims/TestHadoop23Shims.java | 53 +++++++++----------
3 files changed, 97 insertions(+), 41 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 81af2fe..2cdc35f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -220,9 +220,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
// Create base directory but use HDFS path without schema or authority details.
// Hive should pick up the local cluster's HDFS schema/authority.
externalTableBasePathWithClause();
- List<String> loadWithClause = Collections.singletonList(
+ List<String> loadWithClause = Arrays.asList(
"'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
- + REPLICA_EXTERNAL_BASE + "'"
+ + REPLICA_EXTERNAL_BASE + "'",
+ "'distcp.options.update'=''"
);
WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName)
@@ -359,19 +360,62 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName);
replica.load(replicatedDbName, tuple.dumpLocation);
+ Path externalTableLocation =
+ new Path("/" + testName.getMethodName() + "/t1/");
+ DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+ fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
tuple = primary.run("use " + primaryDbName)
- .run("create external table t1 (place string) partitioned by (country string)")
+ .run("create external table t1 (place string) partitioned by (country string) row format "
+ + "delimited fields terminated by ',' location '" + externalTableLocation.toString()
+ + "'")
.run("alter table t1 add partition(country='india')")
.run("alter table t1 add partition(country='us')")
.dump(primaryDbName, tuple.lastReplicationId);
+ assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME));
+
+ // Add new data externally, to a partition, but under the partition level top directory
+ // Also, it is added after dumping the events but data should be seen at target after REPL LOAD.
+ Path partitionDir = new Path(externalTableLocation, "country=india");
+ try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) {
+ outputStream.write("pune\n".getBytes());
+ outputStream.write("mumbai\n".getBytes());
+ }
+
+ try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) {
+ outputStream.write("bangalore\n".getBytes());
+ }
+
List<String> loadWithClause = externalTableBasePathWithClause();
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't1'")
.verifyResult("t1")
.run("show partitions t1")
- .verifyResults(new String[] { "country=india", "country=us" });
+ .verifyResults(new String[] { "country=india", "country=us" })
+ .run("select place from t1 order by place")
+ .verifyResults(new String[] { "bangalore", "mumbai", "pune" });
+
+ // Delete one of the file and update another one.
+ fs.delete(new Path(partitionDir, "file.txt"), true);
+ fs.delete(new Path(partitionDir, "file1.txt"), true);
+ try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) {
+ outputStream.write("chennai\n".getBytes());
+ }
+
+ // Repl load with zero events but external tables location info should present.
+ tuple = primary.dump(primaryDbName, tuple.lastReplicationId);
+ assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME));
+
+ replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show partitions t1")
+ .verifyResults(new String[] { "country=india", "country=us" })
+ .run("select place from t1 order by place")
+ .verifyResults(new String[] { "chennai" });
Hive hive = Hive.get(replica.getConf());
Set<Partition> partitions =
@@ -391,7 +435,6 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
for (String path : paths) {
assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path)));
}
-
}
@Test
@@ -492,9 +535,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
fileSystem.mkdirs(externalTableLocation);
// this is required since the same filesystem is used in both source and target
- return Collections.singletonList(
- "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
- + externalTableLocation.toString() + "'"
+ return Arrays.asList(
+ "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+ + externalTableLocation.toString() + "'",
+ "'distcp.options.pugpb'=''"
);
}
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index b6f70eb..e774419 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -1110,20 +1110,30 @@ public class Hadoop23Shims extends HadoopShimsSecure {
private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
List<String> constructDistCpParams(List<Path> srcPaths, Path dst, Configuration conf) {
+ // -update and -delete are mandatory options for directory copy to work.
+ // -pbx is default preserve options if user doesn't pass any.
List<String> params = new ArrayList<String>();
+ boolean needToAddPreserveOption = true;
for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
String distCpOption = entry.getKey();
String distCpVal = entry.getValue();
+ if (distCpOption.startsWith("p")) {
+ needToAddPreserveOption = false;
+ }
params.add("-" + distCpOption);
if ((distCpVal != null) && (!distCpVal.isEmpty())){
params.add(distCpVal);
}
}
- if (params.size() == 0){
- // if no entries were added via conf, we initiate our defaults
- params.add("-update");
+ if (needToAddPreserveOption) {
params.add("-pbx");
}
+ if (!params.contains("-update")) {
+ params.add("-update");
+ }
+ if (!params.contains("-delete")) {
+ params.add("-delete");
+ }
for (Path src : srcPaths) {
params.add(src.toString());
}
@@ -1150,10 +1160,11 @@ public class Hadoop23Shims extends HadoopShimsSecure {
@Override
public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) throws IOException {
DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst)
- .withSyncFolder(true)
- .withCRC(true)
- .preserve(FileAttribute.BLOCKSIZE)
- .build();
+ .withSyncFolder(true)
+ .withDeleteMissing(true)
+ .preserve(FileAttribute.BLOCKSIZE)
+ .preserve(FileAttribute.XATTR)
+ .build();
// Creates the command-line parameters for distcp
List<String> params = constructDistCpParams(srcPaths, dst, conf);
diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
index 9a9311b..efb0074 100644
--- a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
+++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
@@ -40,40 +40,41 @@ public class TestHadoop23Shims {
Hadoop23Shims shims = new Hadoop23Shims();
List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
- assertEquals(4, paramsDefault.size());
- assertTrue("Distcp -update set by default", paramsDefault.contains("-update"));
+ assertEquals(5, paramsDefault.size());
assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx"));
- assertEquals(copySrc.toString(), paramsDefault.get(2));
- assertEquals(copyDst.toString(), paramsDefault.get(3));
+ assertTrue("Distcp -update set by default", paramsDefault.contains("-update"));
+ assertTrue("Distcp -delete set by default", paramsDefault.contains("-delete"));
+ assertEquals(copySrc.toString(), paramsDefault.get(3));
+ assertEquals(copyDst.toString(), paramsDefault.get(4));
conf.set("distcp.options.foo", "bar"); // should set "-foo bar"
conf.set("distcp.options.blah", ""); // should set "-blah"
+ conf.set("distcp.options.pug", ""); // should set "-pug"
conf.set("dummy", "option"); // should be ignored.
List<String> paramsWithCustomParamInjection =
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
- assertEquals(5, paramsWithCustomParamInjection.size());
-
- // check that the defaults did not remain.
- assertTrue("Distcp -update not set if not requested",
- !paramsWithCustomParamInjection.contains("-update"));
+ assertEquals(8, paramsWithCustomParamInjection.size());
+
+ // check that the mandatory ones remain along with user passed ones.
+ assertTrue("Distcp -update set even if not requested",
+ paramsWithCustomParamInjection.contains("-update"));
+ assertTrue("Distcp -delete set even if not requested",
+ paramsWithCustomParamInjection.contains("-delete"));
+ assertTrue("Distcp -foo is set as passes",
+ paramsWithCustomParamInjection.contains("-foo"));
+ assertTrue("Distcp -blah is set as passes",
+ paramsWithCustomParamInjection.contains("-blah"));
+ assertTrue("Distcp -pug is set as passes",
+ paramsWithCustomParamInjection.contains("-pug"));
+ assertTrue("Distcp -pbx not set as overridden",
+ !paramsWithCustomParamInjection.contains("-pbx"));
assertTrue("Distcp -skipcrccheck not set if not requested",
!paramsWithCustomParamInjection.contains("-skipcrccheck"));
- assertTrue("Distcp -pbx not set if not requested",
- !paramsWithCustomParamInjection.contains("-pbx"));
-
- // the "-foo bar" and "-blah" params order is not guaranteed
- String firstParam = paramsWithCustomParamInjection.get(0);
- if (firstParam.equals("-foo")){
- // "-foo bar -blah" form
- assertEquals("bar", paramsWithCustomParamInjection.get(1));
- assertEquals("-blah", paramsWithCustomParamInjection.get(2));
- } else {
- // "-blah -foo bar" form
- assertEquals("-blah", paramsWithCustomParamInjection.get(0));
- assertEquals("-foo", paramsWithCustomParamInjection.get(1));
- assertEquals("bar", paramsWithCustomParamInjection.get(2));
- }
+
+ // the "-foo bar" order is guaranteed
+ int idx = paramsWithCustomParamInjection.indexOf("-foo");
+ assertEquals("bar", paramsWithCustomParamInjection.get(idx+1));
// the dummy option should not have made it either - only options
// beginning with distcp.options. should be honoured
@@ -82,8 +83,8 @@ public class TestHadoop23Shims {
assertTrue(!paramsWithCustomParamInjection.contains("option"));
assertTrue(!paramsWithCustomParamInjection.contains("-option"));
- assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(3));
- assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(4));
+ assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(6));
+ assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(7));
}