You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2015/07/22 22:48:07 UTC
[39/50] [abbrv] hive git commit: HIVE-11229 Mutation API: Coordinator
communication with meta store should be optional (Elliot West via gates)
HIVE-11229 Mutation API: Coordinator communication with meta store should be optional (Elliot West via gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6ec72de7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6ec72de7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6ec72de7
Branch: refs/heads/hbase-metastore
Commit: 6ec72de79ebb898f699402e8a2d7681c4e39ecd2
Parents: dfdc670
Author: Alan Gates <ga...@hortonworks.com>
Authored: Tue Jul 21 11:08:59 2015 -0700
Committer: Alan Gates <ga...@hortonworks.com>
Committed: Tue Jul 21 11:08:59 2015 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/mutate/package.html | 31 ++++-
.../mutate/worker/CreatePartitionHelper.java | 83 --------------
.../mutate/worker/MetaStorePartitionHelper.java | 102 +++++++++++++++++
.../mutate/worker/MutatorCoordinator.java | 21 ++--
.../worker/MutatorCoordinatorBuilder.java | 41 +++++--
.../mutate/worker/PartitionHelper.java | 17 +++
.../mutate/worker/WarehousePartitionHelper.java | 69 ++++++++++++
.../worker/TestMetaStorePartitionHelper.java | 112 +++++++++++++++++++
.../mutate/worker/TestMutatorCoordinator.java | 40 ++++---
.../worker/TestWarehousePartitionHelper.java | 57 ++++++++++
10 files changed, 452 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
index 09a55b6..72ce6b1 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html
@@ -416,17 +416,39 @@ data, is the responsibility of the client using the API.
</p>
<h3>Dynamic Partition Creation:</h3>
+<p>
It is very likely to be desirable to have new partitions created
automatically (say on a hourly basis). In such cases requiring the Hive
-admin to pre-create the necessary partitions may not be reasonable.
-Consequently the API allows coordinators to create partitions as needed
-(see:
+admin to pre-create the necessary partitions may not be reasonable. The
+API allows coordinators to create partitions as needed (see:
<code>MutatorClientBuilder.addSinkTable(String, String, boolean)</code>
). Partition creation being an atomic action, multiple coordinators can
race to create the partition, but only one would succeed, so
coordinators clients need not synchronize when creating a partition. The
user of the coordinator process needs to be given write permissions on
the Hive table in order to create partitions.
+</p>
+
+<p>Care must be taken when using this option as it requires that the
+coordinators maintain a connection with the meta store database. When
+coordinator are running in a distributed environment (as is likely the
+case) it possible for them to overwhelm the meta store. In such cases it
+may be better to disable partition creation and collect a set of
+affected partitions as part of your ETL merge process. These can then be
+created with a single meta store connection in your client code, once
+the cluster side merge process is complete.</p>
+<p>
+Finally, note that when partition creation is disabled the coordinators
+must synthesize the partition URI as they cannot retrieve it from the
+meta store. This may cause problems if the layout of your partitions in
+HDFS does not follow the Hive standard (as implemented in
+<code>
+org.apache.hadoop.hive.metastore.Warehouse.getPartitionPath(Path,
+LinkedHashMap
+<String , String>).
+</code>
+)
+</p>
<h2>Reading data</h2>
@@ -473,6 +495,7 @@ table. The <code>AcidTableSerializer</code> can help you transport the <code>Aci
when your workers are in a distributed environment.
</li>
<li>Compute your mutation set (this is your ETL merge process).</li>
+<li>Optionally: collect the set of affected partitions.</li>
<li>Append bucket ids to insertion records. A <code>BucketIdResolver</code>
can help here.
</li>
@@ -481,6 +504,8 @@ can help here.
<li>Close your coordinators.</li>
<li>Abort or commit the transaction.</li>
<li>Close your mutation client.</li>
+<li>Optionally: create any affected partitions that do not exist in
+the meta store.</li>
</ol>
<p>
See
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
deleted file mode 100644
index 9aab346..0000000
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/CreatePartitionHelper.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.hive.hcatalog.streaming.mutate.worker;
-
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Utility class that can create new table partitions within the {@link IMetaStoreClient meta store}. */
-class CreatePartitionHelper {
-
- private static final Logger LOG = LoggerFactory.getLogger(CreatePartitionHelper.class);
-
- private final IMetaStoreClient metaStoreClient;
- private final String databaseName;
- private final String tableName;
-
- CreatePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName) {
- this.metaStoreClient = metaStoreClient;
- this.databaseName = databaseName;
- this.tableName = tableName;
- }
-
- /** Returns the expected {@link Path} for a given partition value. */
- Path getPathForPartition(List<String> newPartitionValues) throws WorkerException {
- try {
- String location;
- if (newPartitionValues.isEmpty()) {
- location = metaStoreClient.getTable(databaseName, tableName).getSd().getLocation();
- } else {
- location = metaStoreClient.getPartition(databaseName, tableName, newPartitionValues).getSd().getLocation();
- }
- LOG.debug("Found path {} for partition {}", location, newPartitionValues);
- return new Path(location);
- } catch (NoSuchObjectException e) {
- throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e);
- } catch (TException e) {
- throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '"
- + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e);
- }
- }
-
- /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */
- void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException {
- if (newPartitionValues.isEmpty()) {
- return;
- }
-
- try {
- LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues);
- Table table = metaStoreClient.getTable(databaseName, tableName);
-
- Partition partition = new Partition();
- partition.setDbName(table.getDbName());
- partition.setTableName(table.getTableName());
- StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
- partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
- + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues));
- partition.setSd(partitionSd);
- partition.setValues(newPartitionValues);
-
- metaStoreClient.add_partition(partition);
- } catch (AlreadyExistsException e) {
- LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues);
- } catch (NoSuchObjectException e) {
- LOG.error("Failed to create partition : " + newPartitionValues, e);
- throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e);
- } catch (TException e) {
- LOG.error("Failed to create partition : " + newPartitionValues, e);
- throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '"
- + databaseName + "." + tableName + "'", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java
new file mode 100644
index 0000000..7e2e006
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MetaStorePartitionHelper.java
@@ -0,0 +1,102 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PartitionHelper} implementation that uses the {@link IMetaStoreClient meta store} to both create partitions
+ * and obtain information concerning partitions. Exercise care when using this from within workers that are running in a
+ * cluster as it may overwhelm the meta store database instance. As an alternative, consider using the
+ * {@link WarehousePartitionHelper}, collecting the affected partitions as an output of your merge job, and then
+ * retrospectively adding partitions in your client.
+ */
+class MetaStorePartitionHelper implements PartitionHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetaStorePartitionHelper.class);
+
+ private final IMetaStoreClient metaStoreClient;
+ private final String databaseName;
+ private final String tableName;
+ private final Path tablePath;
+
+ MetaStorePartitionHelper(IMetaStoreClient metaStoreClient, String databaseName, String tableName, Path tablePath) {
+ this.metaStoreClient = metaStoreClient;
+ this.tablePath = tablePath;
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ }
+
+ /** Returns the expected {@link Path} for a given partition value. */
+ @Override
+ public Path getPathForPartition(List<String> newPartitionValues) throws WorkerException {
+ if (newPartitionValues.isEmpty()) {
+ LOG.debug("Using path {} for unpartitioned table {}.{}", tablePath, databaseName, tableName);
+ return tablePath;
+ } else {
+ try {
+ String location = metaStoreClient
+ .getPartition(databaseName, tableName, newPartitionValues)
+ .getSd()
+ .getLocation();
+ LOG.debug("Found path {} for partition {}", location, newPartitionValues);
+ return new Path(location);
+ } catch (NoSuchObjectException e) {
+ throw new WorkerException("Table not found '" + databaseName + "." + tableName + "'.", e);
+ } catch (TException e) {
+ throw new WorkerException("Failed to get path for partitions '" + newPartitionValues + "' on table '"
+ + databaseName + "." + tableName + "' with meta store: " + metaStoreClient, e);
+ }
+ }
+ }
+
+ /** Creates the specified partition if it does not already exist. Does nothing if the table is unpartitioned. */
+ @Override
+ public void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException {
+ if (newPartitionValues.isEmpty()) {
+ return;
+ }
+
+ try {
+ LOG.debug("Attempting to create partition (if not exists) {}.{}:{}", databaseName, tableName, newPartitionValues);
+ Table table = metaStoreClient.getTable(databaseName, tableName);
+
+ Partition partition = new Partition();
+ partition.setDbName(table.getDbName());
+ partition.setTableName(table.getTableName());
+ StorageDescriptor partitionSd = new StorageDescriptor(table.getSd());
+ partitionSd.setLocation(table.getSd().getLocation() + Path.SEPARATOR
+ + Warehouse.makePartName(table.getPartitionKeys(), newPartitionValues));
+ partition.setSd(partitionSd);
+ partition.setValues(newPartitionValues);
+
+ metaStoreClient.add_partition(partition);
+ } catch (AlreadyExistsException e) {
+ LOG.debug("Partition already exisits: {}.{}:{}", databaseName, tableName, newPartitionValues);
+ } catch (NoSuchObjectException e) {
+ LOG.error("Failed to create partition : " + newPartitionValues, e);
+ throw new PartitionCreationException("Table not found '" + databaseName + "." + tableName + "'.", e);
+ } catch (TException e) {
+ LOG.error("Failed to create partition : " + newPartitionValues, e);
+ throw new PartitionCreationException("Failed to create partition '" + newPartitionValues + "' on table '"
+ + databaseName + "." + tableName + "'", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ metaStoreClient.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
index 96f05e5..eaed09e 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
@@ -11,7 +11,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -40,13 +39,12 @@ public class MutatorCoordinator implements Closeable, Flushable {
private static final Logger LOG = LoggerFactory.getLogger(MutatorCoordinator.class);
- private final IMetaStoreClient metaStoreClient;
private final MutatorFactory mutatorFactory;
private final GroupingValidator groupingValidator;
private final SequenceValidator sequenceValidator;
private final AcidTable table;
private final RecordInspector recordInspector;
- private final CreatePartitionHelper partitionHelper;
+ private final PartitionHelper partitionHelper;
private final AcidOutputFormat<?, ?> outputFormat;
private final BucketIdResolver bucketIdResolver;
private final HiveConf configuration;
@@ -57,18 +55,16 @@ public class MutatorCoordinator implements Closeable, Flushable {
private Path partitionPath;
private Mutator mutator;
- MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
+ MutatorCoordinator(HiveConf configuration, MutatorFactory mutatorFactory, PartitionHelper partitionHelper,
AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
- this(metaStoreClient, configuration, mutatorFactory, new CreatePartitionHelper(metaStoreClient,
- table.getDatabaseName(), table.getTableName()), new GroupingValidator(), new SequenceValidator(), table,
+ this(configuration, mutatorFactory, partitionHelper, new GroupingValidator(), new SequenceValidator(), table,
deleteDeltaIfExists);
}
/** Visible for testing only. */
- MutatorCoordinator(IMetaStoreClient metaStoreClient, HiveConf configuration, MutatorFactory mutatorFactory,
- CreatePartitionHelper partitionHelper, GroupingValidator groupingValidator, SequenceValidator sequenceValidator,
- AcidTable table, boolean deleteDeltaIfExists) throws WorkerException {
- this.metaStoreClient = metaStoreClient;
+ MutatorCoordinator(HiveConf configuration, MutatorFactory mutatorFactory, PartitionHelper partitionHelper,
+ GroupingValidator groupingValidator, SequenceValidator sequenceValidator, AcidTable table,
+ boolean deleteDeltaIfExists) throws WorkerException {
this.configuration = configuration;
this.mutatorFactory = mutatorFactory;
this.partitionHelper = partitionHelper;
@@ -156,7 +152,7 @@ public class MutatorCoordinator implements Closeable, Flushable {
mutator.close();
}
} finally {
- metaStoreClient.close();
+ partitionHelper.close();
}
}
@@ -178,7 +174,7 @@ public class MutatorCoordinator implements Closeable, Flushable {
try {
if (partitionHasChanged(newPartitionValues)) {
- if (table.createPartitions()) {
+ if (table.createPartitions() && operationType == OperationType.INSERT) {
partitionHelper.createPartitionIfNotExists(newPartitionValues);
}
Path newPartitionPath = partitionHelper.getPathForPartition(newPartitionValues);
@@ -265,6 +261,7 @@ public class MutatorCoordinator implements Closeable, Flushable {
}
}
+ /* A delta may be present from a previous failed task attempt. */
private void deleteDeltaIfExists(Path partitionPath, long transactionId, int bucketId) throws IOException {
Path deltaPath = AcidUtils.createFilename(partitionPath,
new AcidOutputFormat.Options(configuration)
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
index 8851ea6..cd28e02 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinatorBuilder.java
@@ -1,9 +1,13 @@
package org.apache.hive.hcatalog.streaming.mutate.worker;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.common.HCatUtil;
@@ -57,20 +61,41 @@ public class MutatorCoordinatorBuilder {
}
public MutatorCoordinator build() throws WorkerException, MetaException {
- String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName();
- boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials();
-
configuration = HiveConfFactory.newInstance(configuration, this.getClass(), metaStoreUri);
- IMetaStoreClient metaStoreClient;
+ PartitionHelper partitionHelper;
+ if (table.createPartitions()) {
+ partitionHelper = newMetaStorePartitionHelper();
+ } else {
+ partitionHelper = newWarehousePartitionHelper();
+ }
+
+ return new MutatorCoordinator(configuration, mutatorFactory, partitionHelper, table, deleteDeltaIfExists);
+ }
+
+ private PartitionHelper newWarehousePartitionHelper() throws MetaException, WorkerException {
+ String location = table.getTable().getSd().getLocation();
+ Path tablePath = new Path(location);
+ List<FieldSchema> partitionFields = table.getTable().getPartitionKeys();
+ List<String> partitionColumns = new ArrayList<>(partitionFields.size());
+ for (FieldSchema field : partitionFields) {
+ partitionColumns.add(field.getName());
+ }
+ return new WarehousePartitionHelper(configuration, tablePath, partitionColumns);
+ }
+
+ private PartitionHelper newMetaStorePartitionHelper() throws MetaException, WorkerException {
+ String user = authenticatedUser == null ? System.getProperty("user.name") : authenticatedUser.getShortUserName();
+ boolean secureMode = authenticatedUser == null ? false : authenticatedUser.hasKerberosCredentials();
try {
- metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser, user, secureMode)
- .newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+ IMetaStoreClient metaStoreClient = new UgiMetaStoreClientFactory(metaStoreUri, configuration, authenticatedUser,
+ user, secureMode).newInstance(HCatUtil.getHiveMetastoreClient(configuration));
+ String tableLocation = table.getTable().getSd().getLocation();
+ Path tablePath = new Path(tableLocation);
+ return new MetaStorePartitionHelper(metaStoreClient, table.getDatabaseName(), table.getTableName(), tablePath);
} catch (IOException e) {
throw new WorkerException("Could not create meta store client.", e);
}
-
- return new MutatorCoordinator(metaStoreClient, configuration, mutatorFactory, table, deleteDeltaIfExists);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java
new file mode 100644
index 0000000..d70207a
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/PartitionHelper.java
@@ -0,0 +1,17 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+
+/** Implementations are responsible for creating and obtaining path information about partitions. */
+interface PartitionHelper extends Closeable {
+
+ /** Return the location of the partition described by the provided values. */
+ Path getPathForPartition(List<String> newPartitionValues) throws WorkerException;
+
+ /** Create the partition described by the provided values if it does not exist already. */
+ void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java
new file mode 100644
index 0000000..c2edee3
--- /dev/null
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/WarehousePartitionHelper.java
@@ -0,0 +1,69 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * A {@link PartitionHelper} implementation that uses the {@link Warehouse} class to obtain partition path information.
+ * As this does not require a connection to the meta store database it is safe to use in workers that are distributed on
+ * a cluster. However, it does not support the creation of new partitions so you will need to provide a mechanism to
+ * collect affected partitions in your merge job and create them from your client.
+ */
+class WarehousePartitionHelper implements PartitionHelper {
+
+ private final Warehouse warehouse;
+ private final Path tablePath;
+ private final LinkedHashMap<String, String> partitions;
+ private final List<String> partitionColumns;
+
+ WarehousePartitionHelper(Configuration configuration, Path tablePath, List<String> partitionColumns)
+ throws MetaException {
+ this.tablePath = tablePath;
+ this.partitionColumns = partitionColumns;
+ this.partitions = new LinkedHashMap<>(partitionColumns.size());
+ for (String partitionColumn : partitionColumns) {
+ partitions.put(partitionColumn, null);
+ }
+ warehouse = new Warehouse(configuration);
+ }
+
+ @Override
+ public Path getPathForPartition(List<String> partitionValues) throws WorkerException {
+ if (partitionValues.size() != partitionColumns.size()) {
+ throw new IllegalArgumentException("Incorrect number of partition values. columns=" + partitionColumns
+ + ",values=" + partitionValues);
+ }
+ if (partitionColumns.isEmpty()) {
+ return tablePath;
+ }
+ for (int columnIndex = 0; columnIndex < partitionValues.size(); columnIndex++) {
+ String partitionColumn = partitionColumns.get(columnIndex);
+ String partitionValue = partitionValues.get(columnIndex);
+ partitions.put(partitionColumn, partitionValue);
+ }
+ try {
+ return warehouse.getPartitionPath(tablePath, partitions);
+ } catch (MetaException e) {
+ throw new WorkerException("Unable to determine partition path. tablePath=" + tablePath + ",partition="
+ + partitionValues, e);
+ }
+ }
+
+ /** Throws {@link UnsupportedOperationException}. */
+ @Override
+ public void createPartitionIfNotExists(List<String> newPartitionValues) throws WorkerException {
+ throw new UnsupportedOperationException("You require a connection to the meta store to do this.");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to close here.
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java
new file mode 100644
index 0000000..cc4173e
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMetaStorePartitionHelper.java
@@ -0,0 +1,112 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestMetaStorePartitionHelper {
+
+ private static final Path TABLE_PATH = new Path("table");
+ private static final String TABLE_LOCATION = TABLE_PATH.toString();
+
+ private static final FieldSchema PARTITION_KEY_A = new FieldSchema("A", "string", null);
+ private static final FieldSchema PARTITION_KEY_B = new FieldSchema("B", "string", null);
+ private static final List<FieldSchema> PARTITION_KEYS = Arrays.asList(PARTITION_KEY_A, PARTITION_KEY_B);
+ private static final Path PARTITION_PATH = new Path(TABLE_PATH, "a=1/b=2");
+ private static final String PARTITION_LOCATION = PARTITION_PATH.toString();
+
+ private static final String DATABASE_NAME = "db";
+ private static final String TABLE_NAME = "one";
+
+ private static final List<String> UNPARTITIONED_VALUES = Collections.emptyList();
+ private static final List<String> PARTITIONED_VALUES = Arrays.asList("1", "2");
+
+ @Mock
+ private IMetaStoreClient mockClient;
+ @Mock
+ private Table mockTable;
+ private StorageDescriptor tableStorageDescriptor = new StorageDescriptor();
+
+ @Mock
+ private Partition mockPartition;
+ @Mock
+ private StorageDescriptor mockPartitionStorageDescriptor;
+ @Captor
+ private ArgumentCaptor<Partition> partitionCaptor;
+
+ private PartitionHelper helper;
+
+ @Before
+ public void injectMocks() throws Exception {
+ when(mockClient.getTable(DATABASE_NAME, TABLE_NAME)).thenReturn(mockTable);
+ when(mockTable.getDbName()).thenReturn(DATABASE_NAME);
+ when(mockTable.getTableName()).thenReturn(TABLE_NAME);
+ when(mockTable.getPartitionKeys()).thenReturn(PARTITION_KEYS);
+ when(mockTable.getSd()).thenReturn(tableStorageDescriptor);
+ tableStorageDescriptor.setLocation(TABLE_LOCATION);
+
+ when(mockClient.getPartition(DATABASE_NAME, TABLE_NAME, PARTITIONED_VALUES)).thenReturn(mockPartition);
+ when(mockPartition.getSd()).thenReturn(mockPartitionStorageDescriptor);
+ when(mockPartitionStorageDescriptor.getLocation()).thenReturn(PARTITION_LOCATION);
+
+ helper = new MetaStorePartitionHelper(mockClient, DATABASE_NAME, TABLE_NAME, TABLE_PATH);
+ }
+
+ @Test
+ public void getPathForUnpartitionedTable() throws Exception {
+ Path path = helper.getPathForPartition(UNPARTITIONED_VALUES);
+ assertThat(path, is(TABLE_PATH));
+ verifyZeroInteractions(mockClient);
+ }
+
+ @Test
+ public void getPathForPartitionedTable() throws Exception {
+ Path path = helper.getPathForPartition(PARTITIONED_VALUES);
+ assertThat(path, is(PARTITION_PATH));
+ }
+
+ @Test
+ public void createOnUnpartitionTableDoesNothing() throws Exception {
+ helper.createPartitionIfNotExists(UNPARTITIONED_VALUES);
+ verifyZeroInteractions(mockClient);
+ }
+
+ @Test
+ public void createOnPartitionTable() throws Exception {
+ helper.createPartitionIfNotExists(PARTITIONED_VALUES);
+
+ verify(mockClient).add_partition(partitionCaptor.capture());
+ Partition actual = partitionCaptor.getValue();
+ assertThat(actual.getSd().getLocation(), is(PARTITION_LOCATION));
+ assertThat(actual.getValues(), is(PARTITIONED_VALUES));
+ }
+
+ @Test
+ public void closeSucceeds() throws IOException {
+ helper.close();
+ verify(mockClient).close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
index 6e9ffa2..2983d12 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorCoordinator.java
@@ -2,8 +2,10 @@ package org.apache.hive.hcatalog.streaming.mutate.worker;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@@ -15,7 +17,6 @@ import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hive.hcatalog.streaming.mutate.client.AcidTable;
@@ -42,11 +43,9 @@ public class TestMutatorCoordinator {
private static final RecordIdentifier ROW__ID_INSERT = new RecordIdentifier(-1L, BUCKET_ID, -1L);
@Mock
- private IMetaStoreClient mockMetaStoreClient;
- @Mock
private MutatorFactory mockMutatorFactory;
@Mock
- private CreatePartitionHelper mockPartitionHelper;
+ private PartitionHelper mockPartitionHelper;
@Mock
private GroupingValidator mockGroupingValidator;
@Mock
@@ -79,8 +78,8 @@ public class TestMutatorCoordinator {
when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(true);
when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(true);
- coordinator = new MutatorCoordinator(mockMetaStoreClient, configuration, mockMutatorFactory, mockPartitionHelper,
- mockGroupingValidator, mockSequenceValidator, mockAcidTable, false);
+ coordinator = new MutatorCoordinator(configuration, mockMutatorFactory, mockPartitionHelper, mockGroupingValidator,
+ mockSequenceValidator, mockAcidTable, false);
}
@Test
@@ -127,7 +126,6 @@ public class TestMutatorCoordinator {
coordinator.update(UNPARTITIONED, RECORD);
coordinator.delete(UNPARTITIONED, RECORD);
- verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
verify(mockMutatorFactory)
.newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID + 1));
@@ -145,12 +143,11 @@ public class TestMutatorCoordinator {
when(mockPartitionHelper.getPathForPartition(PARTITION_A)).thenReturn(PATH_A);
when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
- coordinator.update(PARTITION_A, RECORD);
- coordinator.delete(PARTITION_B, RECORD);
- coordinator.update(PARTITION_B, RECORD);
- coordinator.insert(PARTITION_B, RECORD);
+ coordinator.update(PARTITION_A, RECORD); /* PaB0 */
+ coordinator.insert(PARTITION_B, RECORD); /* PbB0 */
+ coordinator.delete(PARTITION_B, RECORD); /* PbB0 */
+ coordinator.update(PARTITION_B, RECORD); /* PbB1 */
- verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_A);
verify(mockPartitionHelper).createPartitionIfNotExists(PARTITION_B);
verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
verify(mockMutatorFactory, times(2)).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_B),
@@ -163,6 +160,18 @@ public class TestMutatorCoordinator {
verify(mockSequenceValidator, times(4)).reset();
}
+ @Test
+ public void partitionThenBucketChangesNoCreateAsPartitionEstablished() throws Exception {
+ when(mockRecordInspector.extractRecordIdentifier(RECORD)).thenReturn(ROW__ID_B0_R0, ROW__ID_INSERT);
+ when(mockBucketIdResolver.computeBucketId(RECORD)).thenReturn(0, 0);
+ when(mockPartitionHelper.getPathForPartition(PARTITION_B)).thenReturn(PATH_B);
+
+ coordinator.delete(PARTITION_B, RECORD); /* PbB0 */
+ coordinator.insert(PARTITION_B, RECORD); /* PbB0 */
+
+ verify(mockPartitionHelper, never()).createPartitionIfNotExists(anyList());
+ }
+
@Test(expected = RecordSequenceException.class)
public void outOfSequence() throws Exception {
when(mockSequenceValidator.isInSequence(any(RecordIdentifier.class))).thenReturn(false);
@@ -175,14 +184,14 @@ public class TestMutatorCoordinator {
verify(mockMutator).update(RECORD);
verify(mockMutator).delete(RECORD);
}
-
+
@Test(expected = GroupRevisitedException.class)
public void revisitGroup() throws Exception {
when(mockGroupingValidator.isInSequence(any(List.class), anyInt())).thenReturn(false);
-
+
coordinator.update(UNPARTITIONED, RECORD);
coordinator.delete(UNPARTITIONED, RECORD);
-
+
verify(mockPartitionHelper).createPartitionIfNotExists(UNPARTITIONED);
verify(mockMutatorFactory).newMutator(any(OrcOutputFormat.class), eq(TRANSACTION_ID), eq(PATH_A), eq(BUCKET_ID));
verify(mockMutator).update(RECORD);
@@ -230,5 +239,6 @@ public class TestMutatorCoordinator {
coordinator.close();
verify(mockMutator).close();
+ verify(mockPartitionHelper).close();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6ec72de7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java
new file mode 100644
index 0000000..e779771
--- /dev/null
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestWarehousePartitionHelper.java
@@ -0,0 +1,57 @@
+package org.apache.hive.hcatalog.streaming.mutate.worker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class TestWarehousePartitionHelper {
+
+ private static final Configuration CONFIGURATION = new Configuration();
+ private static final Path TABLE_PATH = new Path("table");
+
+ private static final List<String> UNPARTITIONED_COLUMNS = Collections.emptyList();
+ private static final List<String> UNPARTITIONED_VALUES = Collections.emptyList();
+
+ private static final List<String> PARTITIONED_COLUMNS = Arrays.asList("A", "B");
+ private static final List<String> PARTITIONED_VALUES = Arrays.asList("1", "2");
+
+ private final PartitionHelper unpartitionedHelper;
+ private final PartitionHelper partitionedHelper;
+
+ public TestWarehousePartitionHelper() throws Exception {
+ unpartitionedHelper = new WarehousePartitionHelper(CONFIGURATION, TABLE_PATH, UNPARTITIONED_COLUMNS);
+ partitionedHelper = new WarehousePartitionHelper(CONFIGURATION, TABLE_PATH, PARTITIONED_COLUMNS);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void createNotSupported() throws Exception {
+ unpartitionedHelper.createPartitionIfNotExists(UNPARTITIONED_VALUES);
+ }
+
+ @Test
+ public void getPathForUnpartitionedTable() throws Exception {
+ Path path = unpartitionedHelper.getPathForPartition(UNPARTITIONED_VALUES);
+ assertThat(path, is(TABLE_PATH));
+ }
+
+ @Test
+ public void getPathForPartitionedTable() throws Exception {
+ Path path = partitionedHelper.getPathForPartition(PARTITIONED_VALUES);
+ assertThat(path, is(new Path(TABLE_PATH, "A=1/B=2")));
+ }
+
+ @Test
+ public void closeSucceeds() throws IOException {
+ partitionedHelper.close();
+ unpartitionedHelper.close();
+ }
+
+}