You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2015/07/22 22:48:07 UTC

[39/50] [abbrv] hive git commit: HIVE-11229 Mutation API: Coordinator communication with meta store should be optional (Elliot West via gates)

HIVE-11229 Mutation API: Coordinator communication with meta store should be optional (Elliot West via gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6ec72de7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6ec72de7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6ec72de7

Branch: refs/heads/hbase-metastore
Commit: 6ec72de79ebb898f699402e8a2d7681c4e39ecd2
Parents: dfdc670
Author: Alan Gates <ga...@hortonworks.com>
Authored: Tue Jul 21 11:08:59 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Tue Jul 21 11:08:59 2015 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/mutate/package.html |  31 ++++-
 .../mutate/worker/CreatePartitionHelper.java    |  83 --------------
 .../mutate/worker/MetaStorePartitionHelper.java | 102 +++++++++++++++++
 .../mutate/worker/MutatorCoordinator.java       |  21 ++--
 .../worker/MutatorCoordinatorBuilder.java       |  41 +++++--
 .../mutate/worker/PartitionHelper.java          |  17 +++
 .../mutate/worker/WarehousePartitionHelper.java |  69 ++++++++++++
 .../worker/TestMetaStorePartitionHelper.java    | 112 +++++++++++++++++++
 .../mutate/worker/TestMutatorCoordinator.java   |  40 ++++---
 .../worker/TestWarehousePartitionHelper.java    |  57 ++++++++++
 10 files changed, 452 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
index 09a55b6..72ce6b1 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
@@ -416,17 +416,39 @@ data, is the responsibility of the client using the API.
 </p>
 
 <h3>Dynamic Partition Creation:</h3>
+<p>
 It is very likely to be desirable to have new partitions created
 automatically (say on a hourly basis). In such cases requiring the Hive
-admin to pre-create the necessary partitions may not be reasonable.
-Consequently the API allows coordinators to create partitions as needed
-(see:
+admin to pre-create the necessary partitions may not be reasonable. The
+API allows coordinators to create partitions as needed (see:
 <code>MutatorClientBuilder.addSinkTable(String, String, boolean)</code>
 ). Partition creation being an atomic action, multiple coordinators can
 race to create the partition, but only one would succeed, so
 coordinators clients need not synchronize when creating a partition. The
 user of the coordinator process needs to be given write permissions on
 the Hive table in order to create partitions.
+</p>
+
+<p>Care must be taken when using this option as it requires that the
+coordinators maintain a connection with the meta store database. When
+coordinator are running in a distributed environment (as is likely the
+case) it possible for them to overwhelm the meta store. In such cases it
+may be better to disable partition creation and collect a set of
+affected partitions as part of your ETL merge process. These can then be
+created with a single meta store connection in your client code, once
+the cluster side merge process is complete.</p>
+<p>
+Finally, note that when partition creation is disabled the coordinators
+must synthesize the partition URI as they cannot retrieve it from the
+meta store. This may cause problems if the layout of your partitions in
+HDFS does not follow the Hive standard (as implemented in
+<code>
+org.apache.hadoop.hive.metastore.Warehouse.getPartitionPath(Path,
+LinkedHashMap
+&lt;String , String&gt;).
+</code>
+)
+</p>
 
 <h2>Reading data</h2>
 
@@ -473,6 +495,7 @@ table. The <code>AcidTableSerializer</code> can help you transport the <code>Aci
 when your workers are in a distributed environment.
 </li>
 <li>Compute your mutation set (this is your ETL merge process).</li>
+<li>Optionally: collect the set of affected partitions.</li>
 <li>Append bucket ids to insertion records. A <code>BucketIdResolver</code>
 can help here.
 </li>
@@ -481,6 +504,8 @@ can help here.
 <li>Close your coordinators.</li>
 <li>Abort or commit the transaction.</li>
 <li>Close your mutation client.</li>
+<li>Optionally: create any affected partitions that do not exist in
+the meta store.</li>
 </ol>
 <p>
 See

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
deleted file mode 100644
index 9aab346..0000000
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.hive.hcatalog.streaming.mutate.worker;
-
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Utility class that can create new table partitions within the {@link IMetaStoreClient meta store}. */
-class CreatePartitionHelper {
-
-  private static final Logger LOG = LoggerFactory.getLogger(CreatePartitionHelper.class);
-
-  private final IMetaStoreClient metaStoreClient;
-  private final String databaseName;
-  private final String tableName;
-
-  CreatePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName) {
-    this.metaStoreClient = metaStoreClient;
-    this.databaseName = databaseName;
-    this.tableName = tableName;
-  }
-
-  /** Returns the expected {@link Path} for a given partition value. */
-  Path getPathForPartition(List<String> newPartitionValues) throws WorkerException {
-    try {
-      String location;
-      if (newPartitionValues.isEmpty()) {
-        location = metaStoreClient.getTable(databaseName, tableName).getSd().getLocation();
-      } else {
-        location = metaStoreClient.getPartition(databaseName, tableName, newPartitionValues).getSd().getLocation();
-      }
-      LOG.debug("Found path {} for partition {}", location, newPartitionValues);
-      return new Path(location);
-    } catch (NoSuchObjectException e) {
-      throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e);
-    } catch (TException e) {
-      throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '"
-          + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e);
-    }
-  }
-
-  /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */
-  void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException {
-    if (newPartitionValues.isEmpty()) {
-      return;
-    }
-
-    try {
-      LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues);
-      Table table = metaStoreClient.getTable(databaseName, tableName);
-
-      Partition partition = new Partition();
-      partition.setDbName(table.getDbName());
-      partition.setTableName(table.getTableName());
-      StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
-      partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
-          + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues));
-      partition.setSd(partitionSd);
-      partition.setValues(newPartitionValues);
-
-      metaStoreClient.add_partition(partition);
-    } catch (AlreadyExistsException e) {
-      LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues);
-    } catch (NoSuchObjectException e) {
-      LOG.error("Failed to create partition : " + newPartitionValues, e);
-      throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e);
-    } catch (TException e) {
-      LOG.error("Failed to create partition : " + newPartitionValues, e);
-      throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '"
-          + databaseName + "." + tableName + "'", e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java
new file mode 100644
index 0000000..7e2e006
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java
@@ -0,0 +1,102 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PartitionHelper} implementation that uses the {@link IMetaStoreClient meta store} to both create partitions
+ * and obtain information concerning partitions. Exercise care when using this from within workers that are running in a
+ * cluster as it may overwhelm the meta store database instance. As an alternative, consider using the
+ * {@link WarehousePartitionHelper}, collecting the affected partitions as an output of your merge job, and then
+ * retrospectively adding partitions in your client.
+ */
+class MetaStorePartitionHelper implements PartitionHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MetaStorePartitionHelper.class);
+
+  private final IMetaStoreClient metaStoreClient;
+  private final String databaseName;
+  private final String tableName;
+  private final Path tablePath;
+
+  MetaStorePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName, Path tablePath) {
+    this.metaStoreClient = metaStoreClient;
+    this.tablePath = tablePath;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+  }
+
+  /** Returns the expected {@link Path} for a given partition value. */
+  @Override
+  public Path getPathForPartition(List<String> newPartitionValues) throws WorkerException {
+    if (newPartitionValues.isEmpty()) {
+      LOG.debug("Using path {} for unpartitioned table {}.{}", tablePath, databaseName, tableName);
+      return tablePath;
+    } else {
+      try {
+        String location = metaStoreClient
+            .getPartition(databaseName, tableName, newPartitionValues)
+            .getSd()
+            .getLocation();
+        LOG.debug("Found path {} for partition {}", location, newPartitionValues);
+        return new Path(location);
+      } catch (NoSuchObjectException e) {
+        throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e);
+      } catch (TException e) {
+        throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '"
+            + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e);
+      }
+    }
+  }
+
+  /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */
+  @Override
+  public void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException {
+    if (newPartitionValues.isEmpty()) {
+      return;
+    }
+
+    try {
+      LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues);
+      Table table = metaStoreClient.getTable(databaseName, tableName);
+
+      Partition partition = new Partition();
+      partition.setDbName(table.getDbName());
+      partition.setTableName(table.getTableName());
+      StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
+      partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
+          + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues));
+      partition.setSd(partitionSd);
+      partition.setValues(newPartitionValues);
+
+      metaStoreClient.add_partition(partition);
+    } catch (AlreadyExistsException e) {
+      LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues);
+    } catch (NoSuchObjectException e) {
+      LOG.error("Failed to create partition : " + newPartitionValues, e);
+      throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e);
+    } catch (TException e) {
+      LOG.error("Failed to create partition : " + newPartitionValues, e);
+      throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '"
+          + databaseName + "." + tableName + "'", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    metaStoreClient.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
index 96f05e5..eaed09e 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
@@ -11,7 +11,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -40,13 +39,12 @@ public class MutatorCoordinator implements Closeable, Flushable {
 
   private static final Logger LOG = LoggerFactory.getLogger(MutatorCoordinator.class);
 
-  private final IMetaStoreClient metaStoreClient;
   private final MutatorFactory mutatorFactory;
   private final GroupingValidator groupingValidator;
   private final SequenceValidator sequenceValidator;
   private final AcidTable table;
   private final RecordInspector recordInspector;
-  private final CreatePartitionHelper partitionHelper;
+  private final PartitionHelper partitionHelper;
   private final AcidOutputFormat<?, ?> outputFormat;
   private final BucketIdResolver bucketIdResolver;
   private final HiveConf configuration;
@@ -57,18 +55,16 @@ public class MutatorCoordinator implements Closeable, Flushable {
   private Path partitionPath;
   private Mutator mutator;
 
-  MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
+  MutatorCoordinator(HiveConf configuration, MutatorFactory mutatorFactory, PartitionHelper partitionHelper,
       AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
-    this(metaStoreClient, configuration, mutatorFactory, new CreatePartitionHelper(metaStoreClient,
-        table.getDatabaseName(), table.getTableName()), new GroupingValidator(), new SequenceValidator(), table,
+    this(configuration, mutatorFactory, partitionHelper, new GroupingValidator(), new SequenceValidator(), table,
         deleteDeltaIfExists);
   }
 
   /** Visible for testing only. */
-  MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
-      CreatePartitionHelper partitionHelper, GroupingValidator groupingValidator, SequenceValidator sequenceValidator,
-      AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
-    this.metaStoreClient = metaStoreClient;
+  MutatorCoordinator(HiveConf configuration, MutatorFactory mutatorFactory, PartitionHelper partitionHelper,
+      GroupingValidator groupingValidator, SequenceValidator sequenceValidator, AcidTable table,
+      boolean deleteDeltaIfExists) throws WorkerException {
     this.configuration = configuration;
     this.mutatorFactory = mutatorFactory;
     this.partitionHelper = partitionHelper;
@@ -156,7 +152,7 @@ public class MutatorCoordinator implements Closeable, Flushable {
         mutator.close();
       }
     } finally {
-      metaStoreClient.close();
+      partitionHelper.close();
     }
   }
 
@@ -178,7 +174,7 @@ public class MutatorCoordinator implements Closeable, Flushable {
 
     try {
       if (partitionHasChanged(newPartitionValues)) {
-        if (table.createPartitions()) {
+        if (table.createPartitions() && operationType == OperationType.INSERT) {
           partitionHelper.createPartitionIfNotExists(newPartitionValues);
         }
         Path newPartitionPath = partitionHelper.getPathForPartition(newPartitionValues);
@@ -265,6 +261,7 @@ public class MutatorCoordinator implements Closeable, Flushable {
     }
   }
 
+  /* A delta may be present from a previous failed task attempt. */
   private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException {
     Path deltaPath = AcidUtils.createFilename(partitionPath,
         new AcidOutputFormat.Options(configuration)

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
index 8851ea6..cd28e02 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
@@ -1,9 +1,13 @@
 package org.apache.hive.hcatalog.streaming.mutate.worker;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.hcatalog.common.HCatUtil;
@@ -57,20 +61,41 @@ public class MutatorCoordinatorBuilder {
   }
 
   public MutatorCoordinator build() throws WorkerException, MetaException {
-    String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName();
-    boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials();
-
     configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri);
 
-    IMetaStoreClient metaStoreClient;
+    PartitionHelper partitionHelper;
+    if (table.createPartitions()) {
+      partitionHelper = newMetaStorePartitionHelper();
+    } else {
+      partitionHelper = newWarehousePartitionHelper();
+    }
+
+    return new MutatorCoordinator(configuration, mutatorFactory, partitionHelper, table, deleteDeltaIfExists);
+  }
+
+  private PartitionHelper newWarehousePartitionHelper() throws MetaException, WorkerException {
+    String location = table.getTable().getSd().getLocation();
+    Path tablePath = new Path(location);
+    List<FieldSchema> partitionFields = table.getTable().getPartitionKeys();
+    List<String> partitionColumns = new ArrayList<>(partitionFields.size());
+    for (FieldSchema field : partitionFields) {
+      partitionColumns.add(field.getName());
+    }
+    return new WarehousePartitionHelper(configuration, tablePath, partitionColumns);
+  }
+
+  private PartitionHelper newMetaStorePartitionHelper() throws MetaException, WorkerException {
+    String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName();
+    boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials();
     try {
-      metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode)
-          .newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+      IMetaStoreClient metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser,
+          user, secureMode).newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+      String tableLocation = table.getTable().getSd().getLocation();
+      Path tablePath = new Path(tableLocation);
+      return new MetaStorePartitionHelper(metaStoreClient, table.getDatabaseName(), table.getTableName(), tablePath);
     } catch (IOException e) {
       throw new WorkerException("Could not create meta store client.", e);
     }
-
-    return new MutatorCoordinator(metaStoreClient, configuration, mutatorFactory, table, deleteDeltaIfExists);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java
new file mode 100644
index 0000000..d70207a
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java
@@ -0,0 +1,17 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+
+/** Implementations are responsible for creating and obtaining path information about partitions. */
+interface PartitionHelper extends Closeable {
+
+  /** Return the location of the partition described by the provided values. */
+  Path getPathForPartition(List<String> newPartitionValues) throws WorkerException;
+
+  /** Create the partition described by the provided values if it does not exist already. */
+  void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java
new file mode 100644
index 0000000..c2edee3
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java
@@ -0,0 +1,69 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * A {@link PartitionHelper} implementation that uses the {@link Warehouse} class to obtain partition path information.
+ * As this does not require a connection to the meta store database it is safe to use in workers that are distributed on
+ * a cluster. However, it does not support the creation of new partitions so you will need to provide a mechanism to
+ * collect affected partitions in your merge job and create them from your client.
+ */
+class WarehousePartitionHelper implements PartitionHelper {
+
+  private final Warehouse warehouse;
+  private final Path tablePath;
+  private final LinkedHashMap<String, String> partitions;
+  private final List<String> partitionColumns;
+
+  WarehousePartitionHelper(Configuration configuration, Path tablePath, List<String> partitionColumns)
+      throws MetaException {
+    this.tablePath = tablePath;
+    this.partitionColumns = partitionColumns;
+    this.partitions = new LinkedHashMap<>(partitionColumns.size());
+    for (String partitionColumn : partitionColumns) {
+      partitions.put(partitionColumn, null);
+    }
+    warehouse = new Warehouse(configuration);
+  }
+
+  @Override
+  public Path getPathForPartition(List<String> partitionValues) throws WorkerException {
+    if (partitionValues.size() != partitionColumns.size()) {
+      throw new IllegalArgumentException("Incorrect number of partition values. columns=" + partitionColumns
+          + ",values=" + partitionValues);
+    }
+    if (partitionColumns.isEmpty()) {
+      return tablePath;
+    }
+    for (int columnIndex = 0; columnIndex < partitionValues.size(); columnIndex++) {
+      String partitionColumn = partitionColumns.get(columnIndex);
+      String partitionValue = partitionValues.get(columnIndex);
+      partitions.put(partitionColumn, partitionValue);
+    }
+    try {
+      return warehouse.getPartitionPath(tablePath, partitions);
+    } catch (MetaException e) {
+      throw new WorkerException("Unable to determine partition path. tablePath=" + tablePath + ",partition="
+          + partitionValues, e);
+    }
+  }
+
+  /** Throws {@link UnsupportedOperationException}. */
+  @Override
+  public void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException {
+    throw new UnsupportedOperationException("You require a connection to the meta store to do this.");
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Nothing to close here.
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java
new file mode 100644
index 0000000..cc4173e
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java
@@ -0,0 +1,112 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMetaStorePartitionHelper {
+
+  private static final Path TABLE_PATH = new Path("table");
+  private static final String TABLE_LOCATION = TABLE_PATH.toString();
+
+  private static final FieldSchema PARTITION_KEY_A = new FieldSchema("A", "string", null);
+  private static final FieldSchema PARTITION_KEY_B = new FieldSchema("B", "string", null);
+  private static final List<FieldSchema> PARTITION_KEYS = Arrays.asList(PARTITION_KEY_A, PARTITION_KEY_B);
+  private static final Path PARTITION_PATH = new Path(TABLE_PATH, "a=1/b=2");
+  private static final String PARTITION_LOCATION = PARTITION_PATH.toString();
+
+  private static final String DATABASE_NAME = "db";
+  private static final String TABLE_NAME = "one";
+
+  private static final List<String> UNPARTITIONED_VALUES = Collections.emptyList();
+  private static final List<String> PARTITIONED_VALUES = Arrays.asList("1", "2");
+
+  @Mock
+  private IMetaStoreClient mockClient;
+  @Mock
+  private Table mockTable;
+  private StorageDescriptor tableStorageDescriptor = new StorageDescriptor();
+
+  @Mock
+  private Partition mockPartition;
+  @Mock
+  private StorageDescriptor mockPartitionStorageDescriptor;
+  @Captor
+  private ArgumentCaptor<Partition> partitionCaptor;
+
+  private PartitionHelper helper;
+
+  @Before
+  public void injectMocks() throws Exception {
+    when(mockClient.getTable(DATABASE_NAME, TABLE_NAME)).thenReturn(mockTable);
+    when(mockTable.getDbName()).thenReturn(DATABASE_NAME);
+    when(mockTable.getTableName()).thenReturn(TABLE_NAME);
+    when(mockTable.getPartitionKeys()).thenReturn(PARTITION_KEYS);
+    when(mockTable.getSd()).thenReturn(tableStorageDescriptor);
+    tableStorageDescriptor.setLocation(TABLE_LOCATION);
+
+    when(mockClient.getPartition(DATABASE_NAME, TABLE_NAME, PARTITIONED_VALUES)).thenReturn(mockPartition);
+    when(mockPartition.getSd()).thenReturn(mockPartitionStorageDescriptor);
+    when(mockPartitionStorageDescriptor.getLocation()).thenReturn(PARTITION_LOCATION);
+
+    helper = new MetaStorePartitionHelper(mockClient, DATABASE_NAME, TABLE_NAME, TABLE_PATH);
+  }
+
+  @Test
+  public void getPathForUnpartitionedTable() throws Exception {
+    Path path = helper.getPathForPartition(UNPARTITIONED_VALUES);
+    assertThat(path, is(TABLE_PATH));
+    verifyZeroInteractions(mockClient);
+  }
+
+  @Test
+  public void getPathForPartitionedTable() throws Exception {
+    Path path = helper.getPathForPartition(PARTITIONED_VALUES);
+    assertThat(path, is(PARTITION_PATH));
+  }
+
+  @Test
+  public void createOnUnpartitionTableDoesNothing() throws Exception {
+    helper.createPartitionIfNotExists(UNPARTITIONED_VALUES);
+    verifyZeroInteractions(mockClient);
+  }
+
+  @Test
+  public void createOnPartitionTable() throws Exception {
+    helper.createPartitionIfNotExists(PARTITIONED_VALUES);
+
+    verify(mockClient).add_partition(partitionCaptor.capture());
+    Partition actual = partitionCaptor.getValue();
+    assertThat(actual.getSd().getLocation(), is(PARTITION_LOCATION));
+    assertThat(actual.getValues(), is(PARTITIONED_VALUES));
+  }
+
+  @Test
+  public void closeSucceeds() throws IOException {
+    helper.close();
+    verify(mockClient).close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
index 6e9ffa2..2983d12 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
@@ -2,8 +2,10 @@ package org.apache.hive.hcatalog.streaming.mutate.worker;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
@@ -15,7 +17,6 @@ import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
@@ -42,11 +43,9 @@ public class TestMutatorCoordinator {
   private static final RecordIdentifier ROW__ID_INSERT = new RecordIdentifier(-1L, BUCKET_ID, -1L);
 
   @Mock
-  private IMetaStoreClient mockMetaStoreClient;
-  @Mock
   private MutatorFactory mockMutatorFactory;
   @Mock
-  private CreatePartitionHelper mockPartitionHelper;
+  private PartitionHelper mockPartitionHelper;
   @Mock
   private GroupingValidator mockGroupingValidator;
   @Mock
@@ -79,8 +78,8 @@ public class TestMutatorCoordinator {
     when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(true);
     when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(true);
 
-    coordinator = new MutatorCoordinator(mockMetaStoreClient, configuration, mockMutatorFactory, mockPartitionHelper,
-        mockGroupingValidator, mockSequenceValidator, mockAcidTable, false);
+    coordinator = new MutatorCoordinator(configuration, mockMutatorFactory, mockPartitionHelper, mockGroupingValidator,
+        mockSequenceValidator, mockAcidTable, false);
   }
 
   @Test
@@ -127,7 +126,6 @@ public class TestMutatorCoordinator {
     coordinator.update(UNPARTITIONED, RECORD);
     coordinator.delete(UNPARTITIONED, RECORD);
 
-    verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
     verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutatorFactory)
         .newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1));
@@ -145,12 +143,11 @@ public class TestMutatorCoordinator {
     when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A);
     when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
 
-    coordinator.update(PARTITION_A, RECORD);
-    coordinator.delete(PARTITION_B, RECORD);
-    coordinator.update(PARTITION_B, RECORD);
-    coordinator.insert(PARTITION_B, RECORD);
+    coordinator.update(PARTITION_A, RECORD); /* PaB0 */
+    coordinator.insert(PARTITION_B, RECORD); /* PbB0 */
+    coordinator.delete(PARTITION_B, RECORD); /* PbB0 */
+    coordinator.update(PARTITION_B, RECORD); /* PbB1 */
 
-    verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
     verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
     verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B),
@@ -163,6 +160,18 @@ public class TestMutatorCoordinator {
     verify(mockSequenceValidator, times(4)).reset();
   }
 
+  @Test
+  public void partitionThenBucketChangesNoCreateAsPartitionEstablished() throws Exception {
+    when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_INSERT);
+    when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 0);
+    when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
+
+    coordinator.delete(PARTITION_B, RECORD); /* PbB0 */
+    coordinator.insert(PARTITION_B, RECORD); /* PbB0 */
+
+    verify(mockPartitionHelper, never()).createPartitionIfNotExists(anyList());
+  }
+
   @Test(expected = RecordSequenceException.class)
   public void outOfSequence() throws Exception {
     when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(false);
@@ -175,14 +184,14 @@ public class TestMutatorCoordinator {
     verify(mockMutator).update(RECORD);
     verify(mockMutator).delete(RECORD);
   }
-  
+
   @Test(expected = GroupRevisitedException.class)
   public void revisitGroup() throws Exception {
     when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(false);
-    
+
     coordinator.update(UNPARTITIONED, RECORD);
     coordinator.delete(UNPARTITIONED, RECORD);
-    
+
     verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
     verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
     verify(mockMutator).update(RECORD);
@@ -230,5 +239,6 @@ public class TestMutatorCoordinator {
     coordinator.close();
 
     verify(mockMutator).close();
+    verify(mockPartitionHelper).close();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java
new file mode 100644
index 0000000..e779771
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java
@@ -0,0 +1,57 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class TestWarehousePartitionHelper {
+
+  private static final Configuration CONFIGURATION = new Configuration();
+  private static final Path TABLE_PATH = new Path("table");
+  
+  private static final List<String> UNPARTITIONED_COLUMNS = Collections.emptyList();
+  private static final List<String> UNPARTITIONED_VALUES = Collections.emptyList();
+  
+  private static final List<String> PARTITIONED_COLUMNS = Arrays.asList("A", "B");
+  private static final List<String> PARTITIONED_VALUES = Arrays.asList("1", "2");
+  
+  private final PartitionHelper unpartitionedHelper;
+  private final PartitionHelper partitionedHelper;
+
+  public TestWarehousePartitionHelper() throws Exception {
+    unpartitionedHelper = new WarehousePartitionHelper(CONFIGURATION, TABLE_PATH, UNPARTITIONED_COLUMNS);
+    partitionedHelper = new WarehousePartitionHelper(CONFIGURATION, TABLE_PATH, PARTITIONED_COLUMNS);
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void createNotSupported() throws Exception {
+    unpartitionedHelper.createPartitionIfNotExists(UNPARTITIONED_VALUES);
+  }
+
+  @Test
+  public void getPathForUnpartitionedTable() throws Exception {
+    Path path = unpartitionedHelper.getPathForPartition(UNPARTITIONED_VALUES);
+    assertThat(path, is(TABLE_PATH));
+  }
+
+  @Test
+  public void getPathForPartitionedTable() throws Exception {
+    Path path = partitionedHelper.getPathForPartition(PARTITIONED_VALUES);
+    assertThat(path, is(new Path(TABLE_PATH, "A=1/B=2")));
+  }
+
+  @Test
+  public void closeSucceeds() throws IOException {
+    partitionedHelper.close();
+    unpartitionedHelper.close();
+  }
+  
+}