You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2018/04/10 13:43:22 UTC

hive git commit: HIVE-18696: The partition folders might not get cleaned up properly in the HiveMetaStore.add_partitions_core method if an exception occurs (Marta Kuczora, via Peter Vary)

Repository: hive
Updated Branches:
  refs/heads/master be420098f -> 5eed779c6


HIVE-18696: The partition folders might not get cleaned up properly in the HiveMetaStore.add_partitions_core method if an exception occurs (Marta Kuczora, via Peter Vary)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5eed779c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5eed779c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5eed779c

Branch: refs/heads/master
Commit: 5eed779c611c7c766b69f992d76683c58b5772c9
Parents: be42009
Author: Marta Kuczora <ku...@cloudera.com>
Authored: Tue Apr 10 14:00:37 2018 +0200
Committer: Peter Vary <pv...@cloudera.com>
Committed: Tue Apr 10 15:42:56 2018 +0200

----------------------------------------------------------------------
 .../hadoop/hive/metastore/HiveMetaStore.java    | 305 ++++++++++---------
 .../metastore/client/TestAddPartitions.java     | 235 +++++++++++---
 .../client/TestAddPartitionsFromPartSpec.java   | 232 +++++++++++---
 3 files changed, 544 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5eed779c/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 30922ba..c52d337 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -3125,54 +3125,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       return ret;
     }
 
-    private static class PartValEqWrapper {
-      Partition partition;
-
-      PartValEqWrapper(Partition partition) {
-        this.partition = partition;
-      }
-
-      @Override
-      public int hashCode() {
-        return partition.isSetValues() ? partition.getValues().hashCode() : 0;
-      }
-
-      @Override
-      public boolean equals(Object obj) {
-        if (this == obj) {
-          return true;
-        }
-        if (obj == null || !(obj instanceof PartValEqWrapper)) {
-          return false;
-        }
-        Partition p1 = this.partition, p2 = ((PartValEqWrapper)obj).partition;
-        if (!p1.isSetValues() || !p2.isSetValues()) {
-          return p1.isSetValues() == p2.isSetValues();
-        }
-        if (p1.getValues().size() != p2.getValues().size()) {
-          return false;
-        }
-        for (int i = 0; i < p1.getValues().size(); ++i) {
-          String v1 = p1.getValues().get(i);
-          String v2 = p2.getValues().get(i);
-          if (v1 == null && v2 == null) {
-            continue;
-          }
-          if (v1 == null || !v1.equals(v2)) {
-            return false;
-          }
-        }
-        return true;
-      }
-    }
-
     private static class PartValEqWrapperLite {
       List<String> values;
       String location;
 
       PartValEqWrapperLite(Partition partition) {
         this.values = partition.isSetValues()? partition.getValues() : null;
-        this.location = partition.getSd().getLocation();
+        if (partition.getSd() != null) {
+          this.location = partition.getSd().getLocation();
+        }
       }
 
       @Override
@@ -3220,7 +3181,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       logInfo("add_partitions");
       boolean success = false;
       // Ensures that the list doesn't have dups, and keeps track of directories we have created.
-      final Map<PartValEqWrapper, Boolean> addedPartitions = new ConcurrentHashMap<>();
+      final Map<PartValEqWrapperLite, Boolean> addedPartitions = new ConcurrentHashMap<>();
       final List<Partition> newParts = new ArrayList<>();
       final List<Partition> existingParts = new ArrayList<>();
       Table tbl = null;
@@ -3239,78 +3200,103 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           firePreEvent(new PreAddPartitionEvent(tbl, parts, this));
         }
 
-        List<Future<Partition>> partFutures = Lists.newArrayList();
-        final Table table = tbl;
+        Set<PartValEqWrapperLite> partsToAdd = new HashSet<>(parts.size());
+        List<Partition> partitionsToAdd = new ArrayList<>(parts.size());
         for (final Partition part : parts) {
+          // Iterate through the partitions and validate them. If one of the partitions is
+          // incorrect, an exception will be thrown before the threads which create the partition
+          // folders are submitted. This way we can be sure that no partition and no partition
+          // folder will be created if the list contains an invalid partition.
           if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {
-            throw new MetaException("Partition does not belong to target table " +
-                getCatalogQualifiedTableName(catName, dbName, tblName) + ": " +
-                    part);
+            String errorMsg = String.format(
+                "Partition does not belong to target table %s. It belongs to the table %s.%s : %s",
+                getCatalogQualifiedTableName(catName, dbName, tblName), part.getDbName(),
+                part.getTableName(), part.toString());
+            throw new MetaException(errorMsg);
           }
 
           boolean shouldAdd = startAddPartition(ms, part, ifNotExists);
           if (!shouldAdd) {
             existingParts.add(part);
-            LOG.info("Not adding partition " + part + " as it already exists");
+            LOG.info("Not adding partition {} as it already exists", part);
             continue;
           }
 
-          final UserGroupInformation ugi;
-          try {
-            ugi = UserGroupInformation.getCurrentUser();
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-
-          partFutures.add(threadPool.submit(new Callable<Partition>() {
-            @Override
-            public Partition call() throws Exception {
-              ugi.doAs(new PrivilegedExceptionAction<Object>() {
-                @Override
-                public Object run() throws Exception {
-                  try {
-                    boolean madeDir = createLocationForAddedPartition(table, part);
-                    if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
-                      // Technically, for ifNotExists case, we could insert one and discard the other
-                      // because the first one now "exists", but it seems better to report the problem
-                      // upstream as such a command doesn't make sense.
-                      throw new MetaException("Duplicate partitions in the list: " + part);
-                    }
-                    initializeAddedPartition(table, part, madeDir);
-                  } catch (MetaException e) {
-                    throw new IOException(e.getMessage(), e);
-                  }
-                  return null;
-                }
-              });
-              return part;
+          if (!partsToAdd.add(new PartValEqWrapperLite(part))) {
+            // Technically, for ifNotExists case, we could insert one and discard the other
+            // because the first one now "exists", but it seems better to report the problem
+            // upstream as such a command doesn't make sense.
+            throw new MetaException("Duplicate partitions in the list: " + part);
+          }
+
+          partitionsToAdd.add(part);
+        }
+
+        final AtomicBoolean failureOccurred = new AtomicBoolean(false);
+        final Table table = tbl;
+        List<Future<Partition>> partFutures = new ArrayList<>(partitionsToAdd.size());
+
+        final UserGroupInformation ugi;
+        try {
+          ugi = UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+
+        for (final Partition partition : partitionsToAdd) {
+          initializePartitionParameters(table, partition);
+
+          partFutures.add(threadPool.submit(() -> {
+            if (failureOccurred.get()) {
+              return null;
             }
+            ugi.doAs((PrivilegedExceptionAction<Partition>) () -> {
+              try {
+                boolean madeDir = createLocationForAddedPartition(table, partition);
+                addedPartitions.put(new PartValEqWrapperLite(partition), madeDir);
+                initializeAddedPartition(table, partition, madeDir);
+              } catch (MetaException e) {
+                throw new IOException(e.getMessage(), e);
+              }
+              return null;
+            });
+            return partition;
           }));
         }
 
-        try {
-          for (Future<Partition> partFuture : partFutures) {
+        String errorMessage = null;
+        for (Future<Partition> partFuture : partFutures) {
+          try {
             Partition part = partFuture.get();
-            if (part != null) {
+            if (part != null && !failureOccurred.get()) {
               newParts.add(part);
             }
+          } catch (InterruptedException | ExecutionException e) {
+            // If an exception is thrown in the execution of a task, set the failureOccurred flag to
+            // true. This flag is visible in the tasks and if its value is true, the partition
+            // folders won't be created.
+            // Then iterate through the remaining tasks and wait for them to finish. The tasks which
+            // are started before the flag got set will then finish creating the partition folders.
+            // The tasks which are started after the flag got set, won't create the partition
+            // folders, to avoid unnecessary work.
+            // This way it is sure that all tasks are finished, when entering the finally part where
+            // the partition folders are cleaned up. It won't happen that a task is still running
+            // when cleaning up the folders, so it is sure we won't have leftover folders.
+            // Canceling the other tasks would be also an option but during testing it turned out
+            // that it is not a trustworthy solution to avoid leftover folders.
+            failureOccurred.compareAndSet(false, true);
+            errorMessage = e.getMessage();
           }
-        } catch (InterruptedException | ExecutionException e) {
-          // cancel other tasks
-          for (Future<Partition> partFuture : partFutures) {
-            partFuture.cancel(true);
-          }
-          throw new MetaException(e.getMessage());
+        }
+
+        if (failureOccurred.get()) {
+          throw new MetaException(errorMessage);
         }
 
         if (!newParts.isEmpty()) {
-          success = ms.addPartitions(catName, dbName, tblName, newParts);
-        } else {
-          success = true;
+          ms.addPartitions(catName, dbName, tblName, newParts);
         }
 
-        // Setting success to false to make sure that if the listener fails, rollback happens.
-        success = false;
         // Notification is generated for newly created partitions only. The subset of partitions
         // that already exist (existingParts), will not generate notifications.
         if (!transactionalListeners.isEmpty()) {
@@ -3324,10 +3310,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       } finally {
         if (!success) {
           ms.rollbackTransaction();
-          for (Map.Entry<PartValEqWrapper, Boolean> e : addedPartitions.entrySet()) {
+          for (Map.Entry<PartValEqWrapperLite, Boolean> e : addedPartitions.entrySet()) {
             if (e.getValue()) {
               // we just created this directory - it's not a case of pre-creation, so we nuke.
-              wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);
+              wh.deleteDir(new Path(e.getKey().location), true);
             }
           }
 
@@ -3465,70 +3451,98 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this));
-        List<Future<Partition>> partFutures = Lists.newArrayList();
-        final Table table = tbl;
-        while(partitionIterator.hasNext()) {
+        Set<PartValEqWrapperLite> partsToAdd = new HashSet<>(partitionSpecProxy.size());
+        List<Partition> partitionsToAdd = new ArrayList<>(partitionSpecProxy.size());
+        while (partitionIterator.hasNext()) {
+          // Iterate through the partitions and validate them. If one of the partitions is
+          // incorrect, an exception will be thrown before the threads which create the partition
+          // folders are submitted. This way we can be sure that no partition or partition folder
+          // will be created if the list contains an invalid partition.
           final Partition part = partitionIterator.getCurrent();
 
           if (!part.getTableName().equalsIgnoreCase(tblName) || !part.getDbName().equalsIgnoreCase(dbName)) {
-            throw new MetaException("Partition does not belong to target table "
-                + dbName + "." + tblName + ": " + part);
+            String errorMsg = String.format(
+                "Partition does not belong to target table %s.%s. It belongs to the table %s.%s : %s",
+                dbName, tblName, part.getDbName(), part.getTableName(), part.toString());
+            throw new MetaException(errorMsg);
           }
 
           boolean shouldAdd = startAddPartition(ms, part, ifNotExists);
           if (!shouldAdd) {
-            LOG.info("Not adding partition " + part + " as it already exists");
+            LOG.info("Not adding partition {} as it already exists", part);
             continue;
           }
 
-          final UserGroupInformation ugi;
-          try {
-            ugi = UserGroupInformation.getCurrentUser();
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-
-          partFutures.add(threadPool.submit(new Callable<Partition>() {
-            @Override public Partition call() throws Exception {
-              ugi.doAs(new PrivilegedExceptionAction<Partition>() {
-                @Override
-                public Partition run() throws Exception {
-                  try {
-                    boolean madeDir = createLocationForAddedPartition(table, part);
-                    if (addedPartitions.put(new PartValEqWrapperLite(part), madeDir) != null) {
-                      // Technically, for ifNotExists case, we could insert one and discard the other
-                      // because the first one now "exists", but it seems better to report the problem
-                      // upstream as such a command doesn't make sense.
-                      throw new MetaException("Duplicate partitions in the list: " + part);
-                    }
-                    initializeAddedPartition(table, part, madeDir);
-                  } catch (MetaException e) {
-                    throw new IOException(e.getMessage(), e);
-                  }
-                  return null;
-                }
-              });
-              return part;
-            }
-          }));
+          if (!partsToAdd.add(new PartValEqWrapperLite(part))) {
+            // Technically, for ifNotExists case, we could insert one and discard the other
+            // because the first one now "exists", but it seems better to report the problem
+            // upstream as such a command doesn't make sense.
+            throw new MetaException("Duplicate partitions in the list: " + part);
+          }
+
+          partitionsToAdd.add(part);
           partitionIterator.next();
         }
 
+        final AtomicBoolean failureOccurred = new AtomicBoolean(false);
+        List<Future<Partition>> partFutures = new ArrayList<>(partitionsToAdd.size());
+        final Table table = tbl;
+
+        final UserGroupInformation ugi;
         try {
-          for (Future<Partition> partFuture : partFutures) {
+          ugi = UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+
+        for (final Partition partition : partitionsToAdd) {
+          initializePartitionParameters(table, partition);
+
+          partFutures.add(threadPool.submit(() -> {
+            if (failureOccurred.get()) {
+              return null;
+            }
+            ugi.doAs((PrivilegedExceptionAction<Partition>) () -> {
+              try {
+                boolean madeDir = createLocationForAddedPartition(table, partition);
+                addedPartitions.put(new PartValEqWrapperLite(partition), madeDir);
+                initializeAddedPartition(table, partition, madeDir);
+              } catch (MetaException e) {
+                throw new IOException(e.getMessage(), e);
+              }
+              return null;
+            });
+            return partition;
+          }));
+        }
+
+        String errorMessage = null;
+        for (Future<Partition> partFuture : partFutures) {
+          try {
             partFuture.get();
+          } catch (InterruptedException | ExecutionException e) {
+            // If an exception is thrown in the execution of a task, set the failureOccurred flag to
+            // true. This flag is visible in the tasks and if its value is true, the partition
+            // folders won't be created.
+            // Then iterate through the remaining tasks and wait for them to finish. The tasks which
+            // are started before the flag got set will then finish creating the partition folders.
+            // The tasks which are started after the flag got set, won't create the partition
+            // folders, to avoid unnecessary work.
+            // This way it is sure that all tasks are finished, when entering the finally part where
+            // the partition folders are cleaned up. It won't happen that a task is still running
+            // when cleaning up the folders, so it is sure we won't have leftover folders.
+            // Canceling the other tasks would be also an option but during testing it turned out
+            // that it is not a trustworthy solution to avoid leftover folders.
+            failureOccurred.compareAndSet(false, true);
+            errorMessage = e.getMessage();
           }
-        } catch (InterruptedException | ExecutionException e) {
-          // cancel other tasks
-          for (Future<Partition> partFuture : partFutures) {
-            partFuture.cancel(true);
-          }
-          throw new MetaException(e.getMessage());
         }
 
-        success = ms.addPartitions(catName, dbName, tblName, partitionSpecProxy, ifNotExists);
-        //setting success to false to make sure that if the listener fails, rollback happens.
-        success = false;
+        if (failureOccurred.get()) {
+          throw new MetaException(errorMessage);
+        }
+
+        ms.addPartitions(catName, dbName, tblName, partitionSpecProxy, ifNotExists);
 
         if (!transactionalListeners.isEmpty()) {
           transactionalListenerResponses =
@@ -3637,6 +3651,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
         part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
       }
+    }
+
+    private void initializePartitionParameters(final Table tbl, final Partition part)
+        throws MetaException {
+      initializePartitionParameters(tbl,
+          new PartitionSpecProxy.SimplePartitionWrapperIterator(part));
+    }
+
+    private void initializePartitionParameters(final Table tbl,
+        final PartitionSpecProxy.PartitionIterator part) throws MetaException {
 
       // Inherit table properties into partition properties.
       Map<String, String> tblParams = tbl.getParameters();
@@ -3678,6 +3702,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         boolean madeDir = createLocationForAddedPartition(tbl, part);
         try {
           initializeAddedPartition(tbl, part, madeDir);
+          initializePartitionParameters(tbl, part);
           success = ms.addPartition(part);
         } finally {
           if (!success && madeDir) {

http://git-wip-us.apache.org/repos/asf/hive/blob/5eed779c/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java
index 8555eee..f8497c7 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitions.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.hive.metastore.client;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -434,20 +436,39 @@ public class TestAddPartitions extends MetaStoreClientTest {
   @Test(expected = MetaException.class)
   public void testAddPartitionForView() throws Exception {
 
-    Table table = new TableBuilder()
-        .setDbName(DB_NAME)
-        .setTableName(TABLE_NAME)
-        .setType("VIRTUAL_VIEW")
-        .addCol("test_id", "int", "test col id")
-        .addCol("test_value", DEFAULT_COL_TYPE, "test col value")
-        .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE)
-        .setLocation(null)
-        .create(client, metaStore.getConf());
-    Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE);
+    String tableName = "test_add_partition_view";
+    createView(tableName);
+    Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE);
     client.add_partition(partition);
   }
 
   @Test
+  public void testAddPartitionsForViewNullPartLocation() throws Exception {
+
+    String tableName = "test_add_partition_view";
+    createView(tableName);
+    Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE);
+    partition.getSd().setLocation(null);
+    List<Partition> partitions = Lists.newArrayList(partition);
+    client.add_partitions(partitions);
+    Partition part = client.getPartition(DB_NAME, tableName, "year=2017");
+    Assert.assertNull(part.getSd().getLocation());
+  }
+
+  @Test
+  public void testAddPartitionsForViewNullPartSd() throws Exception {
+
+    String tableName = "test_add_partition_view";
+    createView(tableName);
+    Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE);
+    partition.setSd(null);
+    List<Partition> partitions = Lists.newArrayList(partition);
+    client.add_partitions(partitions);
+    Partition part = client.getPartition(DB_NAME, tableName, "year=2017");
+    Assert.assertNull(part.getSd());
+  }
+
+  @Test
   public void testAddPartitionForExternalTable() throws Exception {
 
     String tableName = "part_add_ext_table";
@@ -736,20 +757,26 @@ public class TestAddPartitions extends MetaStoreClientTest {
     client.dropDatabase("parttestdb2", true, true, true);
   }
 
-  @Test(expected = MetaException.class)
+  @Test
   public void testAddPartitionsDuplicateInTheList() throws Exception {
 
     createTable();
+    List<Partition> partitions = buildPartitions(DB_NAME, TABLE_NAME,
+        Lists.newArrayList("2014", "2015", "2017", "2017", "2018", "2019"));
 
-    Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2017");
-    Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2016");
-    Partition partition3 = buildPartition(DB_NAME, TABLE_NAME, "2017");
+    try {
+      client.add_partitions(partitions);
+      Assert.fail("MetaException should have happened.");
+    } catch (MetaException e) {
+      // Expected exception
+    }
 
-    List<Partition> partitions = new ArrayList<>();
-    partitions.add(partition1);
-    partitions.add(partition2);
-    partitions.add(partition3);
-    client.add_partitions(partitions);
+    List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
+    Assert.assertNotNull(parts);
+    Assert.assertTrue(parts.isEmpty());
+    for (Partition partition : partitions) {
+      Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation())));
+    }
   }
 
   @Test
@@ -774,22 +801,32 @@ public class TestAddPartitions extends MetaStoreClientTest {
     Assert.assertTrue(parts.contains("year=THIS"));
   }
 
-  @Test(expected = AlreadyExistsException.class)
+  @Test
   public void testAddPartitionsAlreadyExists() throws Exception {
 
     createTable();
-    Partition partition = buildPartition(DB_NAME, TABLE_NAME, "2017");
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME;
+    Partition partition =
+        buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016a");
     client.add_partition(partition);
 
-    Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2015");
-    Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017");
-    Partition partition3 = buildPartition(DB_NAME, TABLE_NAME, "2016");
+    List<Partition> partitions = buildPartitions(DB_NAME, TABLE_NAME,
+        Lists.newArrayList("2014", "2015", "2016", "2017", "2018"));
 
-    List<Partition> partitions = new ArrayList<>();
-    partitions.add(partition1);
-    partitions.add(partition2);
-    partitions.add(partition3);
-    client.add_partitions(partitions);
+    try {
+      client.add_partitions(partitions);
+      Assert.fail("AlreadyExistsException should have happened.");
+    } catch (AlreadyExistsException e) {
+      // Expected exception
+    }
+
+    List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
+    Assert.assertNotNull(parts);
+    Assert.assertEquals(1, parts.size());
+    Assert.assertEquals(partition.getValues(), parts.get(0).getValues());
+    for (Partition part : partitions) {
+      Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation())));
+    }
   }
 
   @Test(expected = MetaException.class)
@@ -877,16 +914,24 @@ public class TestAddPartitions extends MetaStoreClientTest {
 
     createTable();
     String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME;
-    Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016");
-    Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017");
+    Partition partition1 =
+        buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016");
+    Partition partition2 =
+        buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017");
     Partition partition3 =
         buildPartition(Lists.newArrayList("2015", "march"), getYearAndMonthPartCols(), 1);
     partition3.getSd().setLocation(tableLocation + "/year=2015/month=march");
+    Partition partition4 =
+        buildPartition(DB_NAME, TABLE_NAME, "2018", tableLocation + "/year=2018");
+    Partition partition5 =
+        buildPartition(DB_NAME, TABLE_NAME, "2019", tableLocation + "/year=2019");
 
     List<Partition> partitions = new ArrayList<>();
     partitions.add(partition1);
     partitions.add(partition2);
     partitions.add(partition3);
+    partitions.add(partition4);
+    partitions.add(partition5);
 
     try {
       client.add_partitions(partitions);
@@ -898,15 +943,9 @@ public class TestAddPartitions extends MetaStoreClientTest {
     List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
     Assert.assertNotNull(parts);
     Assert.assertTrue(parts.isEmpty());
-    // TODO: This does not work correctly. None of the partitions is created, but the folder
-    // for the first two is created. It is because in HiveMetaStore.add_partitions_core when
-    // going through the partitions, the first two are already put and started in the thread
-    // pool when the exception occurs in the third one. When the exception occurs, we go to
-    // the finally part, but the map can be empty (it depends on the progress of the other
-    // threads) so the folders won't be deleted.
-//    Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2016")));
-//    Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2017")));
-    Assert.assertFalse(metaStore.isPathExists(new Path(tableLocation + "/year=2015/month=march")));
+    for (Partition part : partitions) {
+      Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation())));
+    }
   }
 
   @Test(expected = MetaException.class)
@@ -1036,16 +1075,9 @@ public class TestAddPartitions extends MetaStoreClientTest {
   @Test(expected=MetaException.class)
   public void testAddPartitionsForView() throws Exception {
 
-    Table table = new TableBuilder()
-        .setDbName(DB_NAME)
-        .setTableName(TABLE_NAME)
-        .setType("VIRTUAL_VIEW")
-        .addCol("test_id", "int", "test col id")
-        .addCol("test_value", "string", "test col value")
-        .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE)
-        .setLocation(null)
-        .create(client, metaStore.getConf());
-    Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE);
+    String tableName = "test_add_partition_view";
+    createView(tableName);
+    Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE);
     List<Partition> partitions = Lists.newArrayList(partition);
     client.add_partitions(partitions);
   }
@@ -1169,6 +1201,70 @@ public class TestAddPartitions extends MetaStoreClientTest {
     Assert.assertEquals("year=__HIVE_DEFAULT_PARTITION__", partitionNames.get(0));
   }
 
+  @Test
+  public void testAddPartitionsInvalidLocation() throws Exception {
+
+    createTable();
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME;
+    Map<String, String> valuesAndLocations = new HashMap<>();
+    valuesAndLocations.put("2014", tableLocation + "/year=2014");
+    valuesAndLocations.put("2015", tableLocation + "/year=2015");
+    valuesAndLocations.put("2016", "invalidhost:80000/wrongfolder");
+    valuesAndLocations.put("2017", tableLocation + "/year=2017");
+    valuesAndLocations.put("2018", tableLocation + "/year=2018");
+    List<Partition> partitions = buildPartitions(DB_NAME, TABLE_NAME, valuesAndLocations);
+
+    try {
+      client.add_partitions(partitions);
+      Assert.fail("MetaException should have happened.");
+    } catch (MetaException e) {
+
+    }
+
+    List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
+    Assert.assertNotNull(parts);
+    Assert.assertTrue(parts.isEmpty());
+    for (Partition partition : partitions) {
+      if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) {
+        Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation())));
+      }
+    }
+  }
+
+  @Test
+  public void testAddPartitionsMoreThanThreadCountsOneFails() throws Exception {
+
+    createTable();
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME;
+
+    List<Partition> partitions = new ArrayList<>();
+    for (int i = 0; i < 50; i++) {
+      String value = String.valueOf(2000 + i);
+      String location = tableLocation + "/year=" + value;
+      if (i == 30) {
+        location = "invalidhost:80000/wrongfolder";
+      }
+      Partition partition = buildPartition(DB_NAME, TABLE_NAME, value, location);
+      partitions.add(partition);
+    }
+
+    try {
+      client.add_partitions(partitions);
+      Assert.fail("MetaException should have happened.");
+    } catch (MetaException e) {
+
+    }
+
+    List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
+    Assert.assertNotNull(parts);
+    Assert.assertTrue(parts.isEmpty());
+    for (Partition partition : partitions) {
+      if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) {
+        Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation())));
+      }
+    }
+  }
+
   // Tests for List<Partition> add_partitions(List<Partition> partitions,
   // boolean ifNotExists, boolean needResults) method
 
@@ -1531,4 +1627,43 @@ public class TestAddPartitions extends MetaStoreClientTest {
     Assert.assertTrue("Per default the skewedInfo column value location map should be empty.",
         skewedInfo.getSkewedColValueLocationMaps().isEmpty());
   }
+
+  private List<Partition> buildPartitions(String dbName, String tableName, List<String> values)
+      throws MetaException {
+
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName;
+    List<Partition> partitions = new ArrayList<>();
+
+    for (String value : values) {
+      Partition partition =
+          buildPartition(dbName, tableName, value, tableLocation + "/year=" + value);
+      partitions.add(partition);
+    }
+    return partitions;
+  }
+
+  private List<Partition> buildPartitions(String dbName, String tableName,
+      Map<String, String> valuesAndLocations) throws MetaException {
+
+    List<Partition> partitions = new ArrayList<>();
+
+    for (Map.Entry<String, String> valueAndLocation : valuesAndLocations.entrySet()) {
+      Partition partition =
+          buildPartition(dbName, tableName, valueAndLocation.getKey(), valueAndLocation.getValue());
+      partitions.add(partition);
+    }
+    return partitions;
+  }
+
+  private void createView(String tableName) throws Exception {
+    new TableBuilder()
+        .setDbName(DB_NAME)
+        .setTableName(tableName)
+        .setType("VIRTUAL_VIEW")
+        .addCol("test_id", "int", "test col id")
+        .addCol("test_value", "string", "test col value")
+        .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE)
+        .setLocation(null)
+        .create(client, metaStore.getConf());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5eed779c/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java
index b32954f..fc0c60f 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/client/TestAddPartitionsFromPartSpec.java
@@ -519,31 +519,57 @@ public class TestAddPartitionsFromPartSpec extends MetaStoreClientTest {
     Assert.assertTrue(metaStore.isPathExists(new Path(part.getSd().getLocation())));
   }
 
-  @Test(expected = AlreadyExistsException.class)
+  @Test
   public void testAddPartitionSpecPartAlreadyExists() throws Exception {
 
     createTable();
-    Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE);
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME;
+    Partition partition =
+        buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016a");
     client.add_partition(partition);
 
-    Partition newPartition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE);
+    List<Partition> partitions = buildPartitions(DB_NAME, TABLE_NAME,
+        Lists.newArrayList("2014", "2015", "2016", "2017", "2018"));
     PartitionSpecProxy partitionSpecProxy =
-        buildPartitionSpec(DB_NAME, TABLE_NAME, null, Lists.newArrayList(newPartition));
-    client.add_partitions_pspec(partitionSpecProxy);
+        buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions);
+
+    try {
+      client.add_partitions_pspec(partitionSpecProxy);
+      Assert.fail("AlreadyExistsException should have happened.");
+    } catch (AlreadyExistsException e) {
+      // Expected exception
+    }
+
+    List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
+    Assert.assertNotNull(parts);
+    Assert.assertEquals(1, parts.size());
+    Assert.assertEquals(partition.getValues(), parts.get(0).getValues());
+    for (Partition part : partitions) {
+      Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation())));
+    }
   }
 
-  @Test(expected = MetaException.class)
+  @Test
   public void testAddPartitionSpecPartDuplicateInSpec() throws Exception {
 
     createTable();
-    Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE);
-    Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE);
-    List<Partition> partitions = new ArrayList<>();
-    partitions.add(partition1);
-    partitions.add(partition2);
+    List<Partition> partitions = buildPartitions(DB_NAME, TABLE_NAME,
+        Lists.newArrayList("2014", "2015", "2017", "2017", "2018", "2019"));
     PartitionSpecProxy partitionSpecProxy =
         buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions);
-    client.add_partitions_pspec(partitionSpecProxy);
+    try {
+      client.add_partitions_pspec(partitionSpecProxy);
+      Assert.fail("MetaException should have happened.");
+    } catch (MetaException e) {
+      // Expected exception
+    }
+
+    List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
+    Assert.assertNotNull(parts);
+    Assert.assertTrue(parts.isEmpty());
+    for (Partition partition : partitions) {
+      Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation())));
+    }
   }
 
   @Test(expected = MetaException.class)
@@ -672,20 +698,40 @@ public class TestAddPartitionsFromPartSpec extends MetaStoreClientTest {
   @Test(expected=MetaException.class)
   public void testAddPartitionSpecForView() throws Exception {
 
-    Table table = new TableBuilder()
-        .setDbName(DB_NAME)
-        .setTableName(TABLE_NAME)
-        .setType("VIRTUAL_VIEW")
-        .addCol("test_id", "int", "test col id")
-        .addCol("test_value", DEFAULT_COL_TYPE, "test col value")
-        .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE)
-        .setLocation(null)
-        .create(client, metaStore.getConf());
+    String tableName = "test_add_partition_view";
+    createView(tableName);
+    Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE);
+    PartitionSpecProxy partitionSpecProxy =
+        buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition));
+    client.add_partitions_pspec(partitionSpecProxy);
+  }
 
-    Partition partition = buildPartition(DB_NAME, TABLE_NAME, DEFAULT_YEAR_VALUE);
+  @Test
+  public void testAddPartitionSpecForViewNullPartLocation() throws Exception {
+
+    String tableName = "test_add_partition_view";
+    createView(tableName);
+    Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE);
+    partition.getSd().setLocation(null);
     PartitionSpecProxy partitionSpecProxy =
-        buildPartitionSpec(DB_NAME, TABLE_NAME, null, Lists.newArrayList(partition));
+        buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition));
     client.add_partitions_pspec(partitionSpecProxy);
+    Partition part = client.getPartition(DB_NAME, tableName, "year=2017");
+    Assert.assertNull(part.getSd().getLocation());
+  }
+
+  @Test
+  public void testAddPartitionsForViewNullPartSd() throws Exception {
+
+    String tableName = "test_add_partition_view";
+    createView(tableName);
+    Partition partition = buildPartition(DB_NAME, tableName, DEFAULT_YEAR_VALUE);
+    partition.setSd(null);
+    PartitionSpecProxy partitionSpecProxy =
+        buildPartitionSpec(DB_NAME, tableName, null, Lists.newArrayList(partition));
+    client.add_partitions_pspec(partitionSpecProxy);
+    Partition part = client.getPartition(DB_NAME, tableName, "year=2017");
+    Assert.assertNull(part.getSd());
   }
 
   @Test
@@ -790,33 +836,104 @@ public class TestAddPartitionsFromPartSpec extends MetaStoreClientTest {
   public void testAddPartitionSpecOneInvalid() throws Exception {
 
     createTable();
-    Partition partition1 = buildPartition(DB_NAME, TABLE_NAME, "2016");
-    Partition partition2 = buildPartition(DB_NAME, TABLE_NAME, "2017");
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME;
+    Partition partition1 =
+        buildPartition(DB_NAME, TABLE_NAME, "2016", tableLocation + "/year=2016");
+    Partition partition2 =
+        buildPartition(DB_NAME, TABLE_NAME, "2017", tableLocation + "/year=2017");
     Partition partition3 =
         buildPartition(Lists.newArrayList("2015", "march"), getYearAndMonthPartCols(), 1);
-    partition3.getSd().setLocation(metaStore.getWarehouseRoot() + "/addparttest");
-    List<Partition> partitions = Lists.newArrayList(partition1, partition2, partition3);
+    partition3.getSd().setLocation(tableLocation + "/year=2015/month=march");
+    Partition partition4 =
+        buildPartition(DB_NAME, TABLE_NAME, "2018", tableLocation + "/year=2018");
+    Partition partition5 =
+        buildPartition(DB_NAME, TABLE_NAME, "2019", tableLocation + "/year=2019");
+    List<Partition> partitions =
+        Lists.newArrayList(partition1, partition2, partition3, partition4, partition5);
+    PartitionSpecProxy partitionSpecProxy =
+        buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions);
+
+    try {
+      client.add_partitions_pspec(partitionSpecProxy);
+      Assert.fail("MetaException should have happened.");
+    } catch (MetaException e) {
+      // Expected exception
+    }
+
+    List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
+    Assert.assertNotNull(parts);
+    Assert.assertTrue(parts.isEmpty());
+    for (Partition part : partitions) {
+      Assert.assertFalse(metaStore.isPathExists(new Path(part.getSd().getLocation())));
+    }
+  }
+
+  @Test
+  public void testAddPartitionSpecInvalidLocation() throws Exception {
+
+    createTable();
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME;
+    Map<String, String> valuesAndLocations = new HashMap<>();
+    valuesAndLocations.put("2014", tableLocation + "/year=2014");
+    valuesAndLocations.put("2015", tableLocation + "/year=2015");
+    valuesAndLocations.put("2016", "invalidhost:80000/wrongfolder");
+    valuesAndLocations.put("2017", tableLocation + "/year=2017");
+    valuesAndLocations.put("2018", tableLocation + "/year=2018");
+    List<Partition> partitions = buildPartitions(DB_NAME, TABLE_NAME, valuesAndLocations);
+    PartitionSpecProxy partitionSpecProxy =
+        buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions);
+
+    try {
+      client.add_partitions_pspec(partitionSpecProxy);
+      Assert.fail("MetaException should have happened.");
+    } catch (MetaException e) {
+      // Expected exception
+    }
+
+    List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
+    Assert.assertNotNull(parts);
+    Assert.assertTrue(parts.isEmpty());
+    for (Partition partition : partitions) {
+      if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) {
+        Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation())));
+      }
+    }
+  }
+
+  @Test
+  public void testAddPartitionSpecMoreThanThreadCountsOneFails() throws Exception {
+
+    createTable();
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + TABLE_NAME;
+
+    List<Partition> partitions = new ArrayList<>();
+    for (int i = 0; i < 50; i++) {
+      String value = String.valueOf(2000 + i);
+      String location = tableLocation + "/year=" + value;
+      if (i == 30) {
+        location = "invalidhost:80000/wrongfolder";
+      }
+      Partition partition = buildPartition(DB_NAME, TABLE_NAME, value, location);
+      partitions.add(partition);
+    }
     PartitionSpecProxy partitionSpecProxy =
         buildPartitionSpec(DB_NAME, TABLE_NAME, null, partitions);
+
     try {
       client.add_partitions_pspec(partitionSpecProxy);
-      Assert.fail("MetaException should have occurred.");
+      Assert.fail("MetaException should have happened.");
     } catch (MetaException e) {
-      // This is expected
+      // Expected exception
     }
 
     List<Partition> parts = client.listPartitions(DB_NAME, TABLE_NAME, MAX);
     Assert.assertNotNull(parts);
     Assert.assertTrue(parts.isEmpty());
-    // TODO: This does not work correctly. None of the partitions is created, but the folder
-    // for the first two is created. It is because in HiveMetaStore.add_partitions_core when
-    // going through the partitions, the first two are already put and started in the thread
-    // pool when the exception occurs in the third one.
-    // When the exception occurs, we go to the finally part, but the map can be empty
-    // (it depends on the progress of the other threads) so the folders won't be deleted.
-    // Assert.assertTrue(metaStore.isPathExists(new Path(partition1.getSd().getLocation())));
-    // Assert.assertTrue(metaStore.isPathExists(new Path(partition2.getSd().getLocation())));
-    // Assert.assertTrue(metaStore.isPathExists(new Path(partition3.getSd().getLocation())));
+    for (Partition partition : partitions) {
+      if (!"invalidhost:80000/wrongfolder".equals(partition.getSd().getLocation())) {
+        Assert.assertFalse(metaStore.isPathExists(new Path(partition.getSd().getLocation())));
+      }
+    }
   }
 
   // Helper methods
@@ -1049,4 +1166,43 @@ public class TestAddPartitionsFromPartSpec extends MetaStoreClientTest {
         sd.getLocation());
     Assert.assertTrue(metaStore.isPathExists(new Path(sd.getLocation())));
   }
+
+  private List<Partition> buildPartitions(String dbName, String tableName, List<String> values)
+      throws MetaException {
+
+    String tableLocation = metaStore.getWarehouseRoot() + "/" + tableName;
+    List<Partition> partitions = new ArrayList<>();
+
+    for (String value : values) {
+      Partition partition =
+          buildPartition(dbName, tableName, value, tableLocation + "/year=" + value);
+      partitions.add(partition);
+    }
+    return partitions;
+  }
+
+  private List<Partition> buildPartitions(String dbName, String tableName,
+      Map<String, String> valuesAndLocations) throws MetaException {
+
+    List<Partition> partitions = new ArrayList<>();
+
+    for (Map.Entry<String, String> valueAndLocation : valuesAndLocations.entrySet()) {
+      Partition partition =
+          buildPartition(dbName, tableName, valueAndLocation.getKey(), valueAndLocation.getValue());
+      partitions.add(partition);
+    }
+    return partitions;
+  }
+
+  private void createView(String tableName) throws Exception {
+    new TableBuilder()
+        .setDbName(DB_NAME)
+        .setTableName(tableName)
+        .setType("VIRTUAL_VIEW")
+        .addCol("test_id", "int", "test col id")
+        .addCol("test_value", "string", "test col value")
+        .addPartCol(YEAR_COL_NAME, DEFAULT_COL_TYPE)
+        .setLocation(null)
+        .create(client, metaStore.getConf());
+  }
 }