You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/02/15 04:27:51 UTC

[hive] branch master updated: HIVE-21269: Mandate -update and -delete as DistCp options to sync data files for external tables replication (Sankar Hariappan, reviewed by Mahesh Kumar Behera)

This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 02d3551  HIVE-21269: Mandate -update and -delete as DistCp options to sync data files for external tables replication (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
02d3551 is described below

commit 02d35510b93068aa42020fe756eeec9496880959
Author: Sankar Hariappan <sa...@apache.org>
AuthorDate: Fri Feb 15 09:56:28 2019 +0530

    HIVE-21269: Mandate -update and -delete as DistCp options to sync data files for external tables replication (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
    
    Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
 .../TestReplicationScenariosExternalTables.java    | 60 +++++++++++++++++++---
 .../apache/hadoop/hive/shims/Hadoop23Shims.java    | 25 ++++++---
 .../hadoop/hive/shims/TestHadoop23Shims.java       | 53 +++++++++----------
 3 files changed, 97 insertions(+), 41 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 81af2fe..2cdc35f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -220,9 +220,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     // Create base directory but use HDFS path without schema or authority details.
     // Hive should pick up the local cluster's HDFS schema/authority.
     externalTableBasePathWithClause();
-    List<String> loadWithClause = Collections.singletonList(
+    List<String> loadWithClause = Arrays.asList(
             "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
-                    + REPLICA_EXTERNAL_BASE + "'"
+                    + REPLICA_EXTERNAL_BASE + "'",
+            "'distcp.options.update'=''"
     );
 
     WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName)
@@ -359,19 +360,62 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName);
     replica.load(replicatedDbName, tuple.dumpLocation);
 
+    Path externalTableLocation =
+            new Path("/" + testName.getMethodName() + "/t1/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
     tuple = primary.run("use " + primaryDbName)
-        .run("create external table t1 (place string) partitioned by (country string)")
+        .run("create external table t1 (place string) partitioned by (country string) row format "
+            + "delimited fields terminated by ',' location '" + externalTableLocation.toString()
+            + "'")
         .run("alter table t1 add partition(country='india')")
         .run("alter table t1 add partition(country='us')")
         .dump(primaryDbName, tuple.lastReplicationId);
 
+    assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME));
+
+    // Add new data externally, to a partition, but under the partition level top directory
+    // Also, it is added after dumping the events but data should be seen at target after REPL LOAD.
+    Path partitionDir = new Path(externalTableLocation, "country=india");
+    try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) {
+      outputStream.write("pune\n".getBytes());
+      outputStream.write("mumbai\n".getBytes());
+    }
+
+    try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) {
+      outputStream.write("bangalore\n".getBytes());
+    }
+
     List<String> loadWithClause = externalTableBasePathWithClause();
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't1'")
         .verifyResult("t1")
         .run("show partitions t1")
-        .verifyResults(new String[] { "country=india", "country=us" });
+        .verifyResults(new String[] { "country=india", "country=us" })
+        .run("select place from t1 order by place")
+        .verifyResults(new String[] { "bangalore", "mumbai", "pune" });
+
+    // Delete one of the file and update another one.
+    fs.delete(new Path(partitionDir, "file.txt"), true);
+    fs.delete(new Path(partitionDir, "file1.txt"), true);
+    try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) {
+      outputStream.write("chennai\n".getBytes());
+    }
+
+    // Repl load with zero events but external tables location info should present.
+    tuple = primary.dump(primaryDbName, tuple.lastReplicationId);
+    assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME));
+
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show partitions t1")
+            .verifyResults(new String[] { "country=india", "country=us" })
+            .run("select place from t1 order by place")
+            .verifyResults(new String[] { "chennai" });
 
     Hive hive = Hive.get(replica.getConf());
     Set<Partition> partitions =
@@ -391,7 +435,6 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     for (String path : paths) {
       assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path)));
     }
-
   }
 
   @Test
@@ -492,9 +535,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     fileSystem.mkdirs(externalTableLocation);
 
     // this is required since the same filesystem is used in both source and target
-    return Collections.singletonList(
-        "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
-            + externalTableLocation.toString() + "'"
+    return Arrays.asList(
+            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+                    + externalTableLocation.toString() + "'",
+            "'distcp.options.pugpb'=''"
     );
   }
 
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index b6f70eb..e774419 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -1110,20 +1110,30 @@ public class Hadoop23Shims extends HadoopShimsSecure {
   private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
 
   List<String> constructDistCpParams(List<Path> srcPaths, Path dst, Configuration conf) {
+    // -update and -delete are mandatory options for directory copy to work.
+    // -pbx is default preserve options if user doesn't pass any.
     List<String> params = new ArrayList<String>();
+    boolean needToAddPreserveOption = true;
     for (Map.Entry<String,String> entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
       String distCpOption = entry.getKey();
       String distCpVal = entry.getValue();
+      if (distCpOption.startsWith("p")) {
+        needToAddPreserveOption = false;
+      }
       params.add("-" + distCpOption);
       if ((distCpVal != null) && (!distCpVal.isEmpty())){
         params.add(distCpVal);
       }
     }
-    if (params.size() == 0){
-      // if no entries were added via conf, we initiate our defaults
-      params.add("-update");
+    if (needToAddPreserveOption) {
       params.add("-pbx");
     }
+    if (!params.contains("-update")) {
+      params.add("-update");
+    }
+    if (!params.contains("-delete")) {
+      params.add("-delete");
+    }
     for (Path src : srcPaths) {
       params.add(src.toString());
     }
@@ -1150,10 +1160,11 @@ public class Hadoop23Shims extends HadoopShimsSecure {
   @Override
   public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) throws IOException {
        DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst)
-        .withSyncFolder(true)
-        .withCRC(true)
-        .preserve(FileAttribute.BLOCKSIZE)
-        .build();
+               .withSyncFolder(true)
+               .withDeleteMissing(true)
+               .preserve(FileAttribute.BLOCKSIZE)
+               .preserve(FileAttribute.XATTR)
+               .build();
 
     // Creates the command-line parameters for distcp
     List<String> params = constructDistCpParams(srcPaths, dst, conf);
diff --git a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
index 9a9311b..efb0074 100644
--- a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
+++ b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
@@ -40,40 +40,41 @@ public class TestHadoop23Shims {
     Hadoop23Shims shims = new Hadoop23Shims();
     List<String> paramsDefault = shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
 
-    assertEquals(4, paramsDefault.size());
-    assertTrue("Distcp -update set by default", paramsDefault.contains("-update"));
+    assertEquals(5, paramsDefault.size());
     assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx"));
-    assertEquals(copySrc.toString(), paramsDefault.get(2));
-    assertEquals(copyDst.toString(), paramsDefault.get(3));
+    assertTrue("Distcp -update set by default", paramsDefault.contains("-update"));
+    assertTrue("Distcp -delete set by default", paramsDefault.contains("-delete"));
+    assertEquals(copySrc.toString(), paramsDefault.get(3));
+    assertEquals(copyDst.toString(), paramsDefault.get(4));
 
     conf.set("distcp.options.foo", "bar"); // should set "-foo bar"
     conf.set("distcp.options.blah", ""); // should set "-blah"
+    conf.set("distcp.options.pug", ""); // should set "-pug"
     conf.set("dummy", "option"); // should be ignored.
     List<String> paramsWithCustomParamInjection =
         shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
 
-    assertEquals(5, paramsWithCustomParamInjection.size());
-
-    // check that the defaults did not remain.
-    assertTrue("Distcp -update not set if not requested",
-        !paramsWithCustomParamInjection.contains("-update"));
+    assertEquals(8, paramsWithCustomParamInjection.size());
+
+    // check that the mandatory ones remain along with user passed ones.
+    assertTrue("Distcp -update set even if not requested",
+        paramsWithCustomParamInjection.contains("-update"));
+    assertTrue("Distcp -delete set even if not requested",
+            paramsWithCustomParamInjection.contains("-delete"));
+    assertTrue("Distcp -foo is set as passes",
+            paramsWithCustomParamInjection.contains("-foo"));
+    assertTrue("Distcp -blah is set as passes",
+            paramsWithCustomParamInjection.contains("-blah"));
+    assertTrue("Distcp -pug is set as passes",
+            paramsWithCustomParamInjection.contains("-pug"));
+    assertTrue("Distcp -pbx not set as overridden",
+            !paramsWithCustomParamInjection.contains("-pbx"));
     assertTrue("Distcp -skipcrccheck not set if not requested",
         !paramsWithCustomParamInjection.contains("-skipcrccheck"));
-    assertTrue("Distcp -pbx not set if not requested",
-        !paramsWithCustomParamInjection.contains("-pbx"));
-
-    // the "-foo bar" and "-blah" params order is not guaranteed
-    String firstParam = paramsWithCustomParamInjection.get(0);
-    if (firstParam.equals("-foo")){
-      // "-foo bar -blah"  form
-      assertEquals("bar", paramsWithCustomParamInjection.get(1));
-      assertEquals("-blah", paramsWithCustomParamInjection.get(2));
-    } else {
-      // "-blah -foo bar" form
-      assertEquals("-blah", paramsWithCustomParamInjection.get(0));
-      assertEquals("-foo", paramsWithCustomParamInjection.get(1));
-      assertEquals("bar", paramsWithCustomParamInjection.get(2));
-    }
+
+    // the "-foo bar" order is guaranteed
+    int idx = paramsWithCustomParamInjection.indexOf("-foo");
+    assertEquals("bar", paramsWithCustomParamInjection.get(idx+1));
 
     // the dummy option should not have made it either - only options
     // beginning with distcp.options. should be honoured
@@ -82,8 +83,8 @@ public class TestHadoop23Shims {
     assertTrue(!paramsWithCustomParamInjection.contains("option"));
     assertTrue(!paramsWithCustomParamInjection.contains("-option"));
 
-    assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(3));
-    assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(4));
+    assertEquals(copySrc.toString(), paramsWithCustomParamInjection.get(6));
+    assertEquals(copyDst.toString(), paramsWithCustomParamInjection.get(7));
 
   }