You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/11/02 06:20:15 UTC
svn commit: r1636085 - in /hive/branches/branch-0.14/hcatalog:
server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/
server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/
webhcat/java-client/src/main/java/org/apache/hiv...
Author: gunther
Date: Sun Nov 2 05:20:15 2014
New Revision: 1636085
URL: http://svn.apache.org/r1636085
Log:
HIVE-7576: Add PartitionSpec support in HCatClient API (Mithun Radhakrishnan, reviewed by Sushanth Sowmyan)
Added:
hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java
Modified:
hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java
hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java
hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java
hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
Modified: hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java?rev=1636085&r1=1636084&r2=1636085&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java (original)
+++ hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java Sun Nov 2 05:20:15 2014
@@ -19,10 +19,13 @@
package org.apache.hive.hcatalog.messaging;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hive.hcatalog.messaging.json.JSONMessageFactory;
@@ -131,6 +134,16 @@ public abstract class MessageFactory {
public abstract AddPartitionMessage buildAddPartitionMessage(Table table, List<Partition> partitions);
/**
+ * Factory method for AddPartitionMessage.
+ * @param table The Table to which the partitions are added.
+ * @param partitionSpec The set of Partitions being added.
+ * @return AddPartitionMessage instance.
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec);
+
+ /**
* Factory method for DropPartitionMessage.
* @param table The Table from which the partition is dropped.
* @param partition The Partition being dropped.
Modified: hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java?rev=1636085&r1=1636084&r2=1636085&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java (original)
+++ hive/branches/branch-0.14/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java Sun Nov 2 05:20:15 2014
@@ -19,9 +19,12 @@
package org.apache.hive.hcatalog.messaging.json;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hive.hcatalog.messaging.AddPartitionMessage;
import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage;
import org.apache.hive.hcatalog.messaging.CreateTableMessage;
@@ -87,6 +90,14 @@ public class JSONMessageFactory extends
}
@Override
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec) {
+ return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
+ table.getTableName(), getPartitionKeyValues(table, partitionSpec), System.currentTimeMillis()/1000);
+ }
+
+ @Override
public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) {
return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)),
@@ -107,4 +118,16 @@ public class JSONMessageFactory extends
partitionList.add(getPartitionKeyValues(table, partition));
return partitionList;
}
+
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ private static List<Map<String, String>> getPartitionKeyValues(Table table, PartitionSpecProxy partitionSpec) {
+ List<Map<String, String>> partitionList = new ArrayList<Map<String, String>>();
+ PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+ while (iterator.hasNext()) {
+ Partition partition = iterator.next();
+ partitionList.add(getPartitionKeyValues(table, partition));
+ }
+ return partitionList;
+ }
}
Modified: hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java?rev=1636085&r1=1636084&r2=1636085&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java Sun Nov 2 05:20:15 2014
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hive.hcatalog.common.HCatException;
@@ -213,6 +215,26 @@ public abstract class HCatClient {
public abstract List<HCatPartition> deserializePartitions(List<String> hcatPartitionStringReps) throws HCatException;
/**
+ * Serializer for HCatPartitionSpec.
+ * @param partitionSpec HCatPartitionSpec to be serialized.
+ * @return A list of Strings, representing the HCatPartitionSpec as a whole.
+ * @throws HCatException On failure to serialize.
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract List<String> serializePartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException;
+
+ /**
+ * Deserializer for HCatPartitionSpec.
+ * @param hcatPartitionSpecStrings List of strings, representing the HCatPartitionSpec as a whole.
+ * @return HCatPartitionSpec, reconstructed from the list of strings.
+ * @throws HCatException On failure to deserialize.
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException;
+
+ /**
* Creates the table like an existing table.
*
* @param dbName The name of the database.
@@ -280,6 +302,21 @@ public abstract class HCatClient {
throws HCatException;
/**
+ * Gets partitions in terms of generic HCatPartitionSpec instances.
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, int maxPartitions) throws HCatException;
+
+ /**
+ * Gets partitions in terms of generic HCatPartitionSpec instances.
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, Map<String, String> partitionSelector, int maxPartitions)
+ throws HCatException;
+
+ /**
* Gets the partition.
*
* @param dbName The database name.
@@ -312,6 +349,17 @@ public abstract class HCatClient {
throws HCatException;
/**
+ * Adds partitions using HCatPartitionSpec.
+ * @param partitionSpec The HCatPartitionSpec representing the set of partitions added.
+ * @return The number of partitions added.
+ * @throws HCatException On failure to add partitions.
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract int addPartitionSpec(HCatPartitionSpec partitionSpec)
+ throws HCatException;
+
+ /**
* Drops partition(s) that match the specified (and possibly partial) partition specification.
* A partial partition-specification is one where not all partition-keys have associated values. For example,
* for a table ('myDb.myTable') with 2 partition keys (dt string, region string),
@@ -344,6 +392,14 @@ public abstract class HCatClient {
String filter) throws HCatException;
/**
+ * List partitions by filter, but as HCatPartitionSpecs.
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract HCatPartitionSpec listPartitionSpecsByFilter(String dbName, String tblName,
+ String filter, int maxPartitions) throws HCatException;
+
+ /**
* Mark partition for event.
*
* @param dbName The database name.
Modified: hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java?rev=1636085&r1=1636084&r2=1636085&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java Sun Nov 2 05:20:15 2014
@@ -25,6 +25,8 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -354,6 +356,31 @@ public class HCatClientHMSImpl extends H
return listPartitionsByFilter(dbName, tblName, getFilterString(partitionSpec));
}
+ @Override
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, int maxPartitions) throws HCatException {
+ try {
+ return new HCatPartitionSpec(getTable(dbName, tableName),
+ hmsClient.listPartitionSpecs(dbName, tableName, maxPartitions));
+ }
+ catch (NoSuchObjectException e) {
+ throw new ObjectNotFoundException(
+ "NoSuchObjectException while retrieving partition.", e);
+ } catch (MetaException e) {
+ throw new HCatException(
+ "MetaException while retrieving partition.", e);
+ } catch (TException e) {
+ throw new ConnectionFailureException(
+ "TException while retrieving partition.", e);
+ }
+ }
+
+ @Override
+ public HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, Map<String, String> partitionSelector, int maxPartitions) throws HCatException {
+ return listPartitionSpecsByFilter(dbName, tableName, getFilterString(partitionSelector), maxPartitions);
+ }
+
private static String getFilterString(Map<String, String> partitionSpec) {
final String AND = " AND ";
@@ -414,7 +441,7 @@ public class HCatClientHMSImpl extends H
Table tbl = null;
try {
tbl = hmsClient.getTable(partInfo.getDatabaseName(),
- partInfo.getTableName());
+ partInfo.getTableName());
// TODO: Should be moved out.
if (tbl.getPartitionKeysSize() == 0) {
throw new HCatException("The table " + partInfo.getTableName()
@@ -512,6 +539,28 @@ public class HCatClientHMSImpl extends H
}
@Override
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public HCatPartitionSpec listPartitionSpecsByFilter(String dbName, String tblName, String filter, int maxPartitions)
+ throws HCatException {
+ try {
+ return new HCatPartitionSpec(getTable(dbName, tblName),
+ hmsClient.listPartitionSpecsByFilter(dbName, tblName, filter, maxPartitions));
+ }
+ catch(MetaException e) {
+ throw new HCatException("MetaException while fetching partitions.", e);
+ }
+ catch (NoSuchObjectException e) {
+ throw new ObjectNotFoundException(
+ "NoSuchObjectException while fetching partitions.", e);
+ }
+ catch (TException e) {
+ throw new ConnectionFailureException(
+ "TException while fetching partitions.", e);
+ }
+ }
+
+ @Override
public void markPartitionForEvent(String dbName, String tblName,
Map<String, String> partKVs, PartitionEventType eventType)
throws HCatException {
@@ -573,7 +622,7 @@ public class HCatClientHMSImpl extends H
String token = null;
try {
token = hmsClient.getDelegationToken(owner,
- renewerKerberosPrincipalName);
+ renewerKerberosPrincipalName);
} catch (MetaException e) {
throw new HCatException(
"MetaException while getting delegation token.", e);
@@ -751,6 +800,30 @@ public class HCatClientHMSImpl extends H
}
@Override
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public int addPartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException {
+
+ try {
+ return hmsClient.add_partitions_pspec(partitionSpec.toPartitionSpecProxy());
+ } catch (InvalidObjectException e) {
+ throw new HCatException(
+ "InvalidObjectException while adding partition.", e);
+ } catch (AlreadyExistsException e) {
+ throw new HCatException(
+ "AlreadyExistsException while adding partition.", e);
+ } catch (MetaException e) {
+ throw new HCatException("MetaException while adding partition.", e);
+ } catch (NoSuchObjectException e) {
+ throw new ObjectNotFoundException("The table "
+ + "could not be found.", e);
+ } catch (TException e) {
+ throw new ConnectionFailureException(
+ "TException while adding partition.", e);
+ }
+ }
+
+ @Override
public String getMessageBusTopicName(String dbName, String tableName) throws HCatException {
try {
return hmsClient.getTable(dbName, tableName).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
@@ -825,4 +898,16 @@ public class HCatClientHMSImpl extends H
}
return partitions;
}
+
+ @Override
+ public List<String> serializePartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException {
+ return MetadataSerializer.get().serializePartitionSpec(partitionSpec);
+ }
+
+ @Override
+ public HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException {
+ HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get().deserializePartitionSpec(hcatPartitionSpecStrings);
+ hcatPartitionSpec.hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName()));
+ return hcatPartitionSpec;
+ }
}
Added: hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java?rev=1636085&view=auto
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java (added)
+++ hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java Sun Nov 2 05:20:15 2014
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api;
+
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hive.hcatalog.common.HCatException;
+
+/**
+ * Generalized representation of a set of HCatPartitions.
+ */
+
+@InterfaceAudience.LimitedPrivate({"Hive"})
+@InterfaceStability.Evolving
+public class HCatPartitionSpec {
+
+ protected HCatTable hcatTable;
+ protected PartitionSpecProxy partitionSpecProxy;
+
+ protected HCatPartitionSpec(HCatTable hcatTable, PartitionSpecProxy partitionSpecProxy) throws HCatException {
+ this.hcatTable = hcatTable;
+ this.partitionSpecProxy = partitionSpecProxy;
+ assert_invariant();
+ }
+
+ /**
+ * Getter for DBName of this HCatPartitionSpec.
+ * @return The name of the DB.
+ */
+ public String getDbName() {
+ return partitionSpecProxy.getDbName();
+ }
+
+ /**
+ * Getter for TableName of this HCatPartitionSpec.
+ * @return The name of the TableName.
+ */
+ public String getTableName() {
+ return partitionSpecProxy.getTableName();
+ }
+
+ /**
+ * Setter for HCatTable. Required for deserialization.
+ */
+ void hcatTable(HCatTable hcatTable) throws HCatException {
+
+ assert this.hcatTable == null : "Expected hcatTable to be null at this point.";
+ this.hcatTable = hcatTable;
+ assert_invariant();
+
+ }
+
+ /**
+ * Conversion to a Hive Metastore API PartitionSpecProxy instance.
+ */
+ PartitionSpecProxy toPartitionSpecProxy() {
+ return partitionSpecProxy;
+ }
+
+ /**
+ * Getter for the number of HCatPartitions represented by this HCatPartitionSpec instance.
+ * @return The number of HCatPartitions.
+ * @throws HCatException On failure.
+ */
+ public int size() throws HCatException {
+ return partitionSpecProxy.size();
+ }
+
+ /**
+ * Setter for the "root" location of the HCatPartitionSpec.
+ * @param location The new "root" location of the HCatPartitionSpec.
+ * @throws HCatException On failure to set a new location.
+ */
+ public void setRootLocation(String location) throws HCatException {
+ try {
+ partitionSpecProxy.setRootLocation(location);
+ }
+ catch (MetaException metaException) {
+ throw new HCatException("Unable to set root-path!", metaException);
+ }
+ }
+
+ /**
+ * Getter for an Iterator to the first HCatPartition in the HCatPartitionSpec.
+ * @return HCatPartitionIterator to the first HCatPartition.
+ */
+ public HCatPartitionIterator getPartitionIterator() {
+ return new HCatPartitionIterator(hcatTable, partitionSpecProxy.getPartitionIterator());
+ }
+
+ // Assert class invariant.
+ private void assert_invariant() throws HCatException {
+
+ if (hcatTable != null) {
+
+ if (!hcatTable.getDbName().equalsIgnoreCase(partitionSpecProxy.getDbName())) {
+ String errorMessage = "Invalid HCatPartitionSpec instance: Table's DBName (" + hcatTable.getDbName() + ") " +
+ "doesn't match PartitionSpec (" + partitionSpecProxy.getDbName() + ")";
+ assert false : errorMessage;
+ throw new HCatException(errorMessage);
+ }
+
+ if (!hcatTable.getTableName().equalsIgnoreCase(partitionSpecProxy.getTableName())) {
+ String errorMessage = "Invalid HCatPartitionSpec instance: Table's TableName (" + hcatTable.getTableName() + ") " +
+ "doesn't match PartitionSpec (" + partitionSpecProxy.getTableName() + ")";
+ assert false : errorMessage;
+ throw new HCatException(errorMessage);
+ }
+ }
+ }
+
+
+ /**
+ * Iterator over HCatPartitions in the HCatPartitionSpec.
+ */
+ public static class HCatPartitionIterator { // implements java.util.Iterator<HCatPartition> {
+
+ private HCatTable hcatTable;
+ private PartitionSpecProxy.PartitionIterator iterator;
+
+ HCatPartitionIterator(HCatTable hcatTable, PartitionSpecProxy.PartitionIterator iterator) {
+ this.hcatTable = hcatTable;
+ this.iterator = iterator;
+ }
+
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ public HCatPartition next() throws HCatException {
+ return new HCatPartition(hcatTable, iterator.next());
+ }
+
+ public void remove() {
+ iterator.remove();
+ }
+
+ } // class HCatPartitionIterator;
+
+} // class HCatPartitionSpec;
Modified: hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java?rev=1636085&r1=1636084&r2=1636085&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java Sun Nov 2 05:20:15 2014
@@ -1,7 +1,11 @@
package org.apache.hive.hcatalog.api;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
@@ -10,6 +14,9 @@ import org.apache.thrift.protocol.TJSONP
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* MetadataSerializer implementation, that serializes HCat API elements into JSON.
*/
@@ -68,4 +75,38 @@ class MetadataJSONSerializer extends Met
throw new HCatException("Could not de-serialize HCatPartition.", exception);
}
}
+
+ @Override
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public List<String> serializePartitionSpec(HCatPartitionSpec hcatPartitionSpec) throws HCatException {
+ try {
+ List<String> stringReps = new ArrayList<String>();
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ for (PartitionSpec partitionSpec : hcatPartitionSpec.partitionSpecProxy.toPartitionSpec()) {
+ stringReps.add(serializer.toString(partitionSpec, "UTF-8"));
+ }
+ return stringReps;
+ }
+ catch (TException serializationException) {
+ throw new HCatException("Failed to serialize!", serializationException);
+ }
+ }
+
+ @Override
+ public HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException {
+ try {
+ List<PartitionSpec> partitionSpecList = new ArrayList<PartitionSpec>();
+ TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
+ for (String stringRep : hcatPartitionSpecStrings) {
+ PartitionSpec partSpec = new PartitionSpec();
+ deserializer.deserialize(partSpec, stringRep, "UTF-8");
+ partitionSpecList.add(partSpec);
+ }
+ return new HCatPartitionSpec(null, PartitionSpecProxy.Factory.get(partitionSpecList));
+ }
+ catch (TException deserializationException) {
+ throw new HCatException("Failed to deserialize!", deserializationException);
+ }
+ }
}
Modified: hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java?rev=1636085&r1=1636084&r2=1636085&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java Sun Nov 2 05:20:15 2014
@@ -1,7 +1,11 @@
package org.apache.hive.hcatalog.api;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hive.hcatalog.common.HCatException;
+import java.util.List;
+
/**
* Interface to serialize HCat API elements.
*/
@@ -51,4 +55,24 @@ abstract class MetadataSerializer {
*/
public abstract HCatPartition deserializePartition(String hcatPartitionStringRep) throws HCatException;
+ /**
+ * Serializer for HCatPartitionSpec.
+ * @param hcatPartitionSpec HCatPartitionSpec instance to be serialized.
+ * @return Serialized string-representations.
+ * @throws HCatException On failure to serialize.
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract List<String> serializePartitionSpec(HCatPartitionSpec hcatPartitionSpec) throws HCatException;
+
+ /**
+ * Deserializer for HCatPartitionSpec string-representations.
+ * @param hcatPartitionSpecStrings List of strings to be converted into an HCatPartitionSpec.
+ * @return Deserialized HCatPartitionSpec instance.
+ * @throws HCatException On failure to deserialize. (e.g. incompatible serialization format, etc.)
+ */
+ @InterfaceAudience.LimitedPrivate({"Hive"})
+ @InterfaceStability.Evolving
+ public abstract HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException;
+
}
Modified: hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java?rev=1636085&r1=1636084&r2=1636085&view=diff
==============================================================================
--- hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java (original)
+++ hive/branches/branch-0.14/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java Sun Nov 2 05:20:15 2014
@@ -970,7 +970,7 @@ public class TestHCatClient {
sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_2).build());
// The source table now has 2 partitions, one in TEXTFILE, the other in ORC.
- // Test that adding these partitions to the target-table *without* replicating the table-change.
+ // Test adding these partitions to the target-table *without* replicating the table-change.
List<HCatPartition> sourcePartitions = sourceMetaStore.getPartitions(dbName, tableName);
assertEquals("Unexpected number of source partitions.", 2, sourcePartitions.size());
@@ -1002,4 +1002,139 @@ public class TestHCatClient {
assertTrue("Unexpected exception! " + unexpected.getMessage(), false);
}
}
+
+ /**
+ * Test that partition-definitions can be replicated between HCat-instances,
+ * independently of table-metadata replication, using PartitionSpec interfaces.
+ * (This is essentially the same test as testPartitionRegistrationWithCustomSchema(),
+ * transliterated to use the PartitionSpec APIs.)
+ * 2 identical tables are created on 2 different HCat instances ("source" and "target").
+ * On the source instance,
+ * 1. One partition is added with the old format ("TEXTFILE").
+ * 2. The table is updated with an additional column and the data-format changed to ORC.
+ * 3. Another partition is added with the new format.
+ * 4. The partitions' metadata is copied to the target HCat instance, without updating the target table definition.
+ * 5. The partitions' metadata is tested to be an exact replica of that on the source.
+ * @throws Exception
+ */
+ @Test
+ public void testPartitionSpecRegistrationWithCustomSchema() throws Exception {
+ try {
+ startReplicationTargetMetaStoreIfRequired();
+
+ HCatClient sourceMetaStore = HCatClient.create(new Configuration(hcatConf));
+ final String dbName = "myDb";
+ final String tableName = "myTable";
+
+ sourceMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE);
+
+ sourceMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build());
+ List<HCatFieldSchema> columnSchema = new ArrayList<HCatFieldSchema>(
+ Arrays.asList(new HCatFieldSchema("foo", Type.INT, ""),
+ new HCatFieldSchema("bar", Type.STRING, "")));
+
+ List<HCatFieldSchema> partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""),
+ new HCatFieldSchema("grid", Type.STRING, ""));
+
+ HCatTable sourceTable = new HCatTable(dbName, tableName).cols(columnSchema)
+ .partCols(partitionSchema)
+ .comment("Source table.");
+
+ sourceMetaStore.createTable(HCatCreateTableDesc.create(sourceTable).build());
+
+ // Verify that the sourceTable was created successfully.
+ sourceTable = sourceMetaStore.getTable(dbName, tableName);
+ assertNotNull("Table couldn't be queried for. ", sourceTable);
+
+ // Partitions added now should inherit table-schema, properties, etc.
+ Map<String, String> partitionSpec_1 = new HashMap<String, String>();
+ partitionSpec_1.put("grid", "AB");
+ partitionSpec_1.put("dt", "2011_12_31");
+ HCatPartition sourcePartition_1 = new HCatPartition(sourceTable, partitionSpec_1, "");
+
+ sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_1).build());
+ assertEquals("Unexpected number of partitions. ",
+ sourceMetaStore.getPartitions(dbName, tableName).size(), 1);
+ // Verify that partition_1 was added correctly, and properties were inherited from the HCatTable.
+ HCatPartition addedPartition_1 = sourceMetaStore.getPartition(dbName, tableName, partitionSpec_1);
+ assertEquals("Column schema doesn't match.", addedPartition_1.getColumns(), sourceTable.getCols());
+ assertEquals("InputFormat doesn't match.", addedPartition_1.getInputFormat(), sourceTable.getInputFileFormat());
+ assertEquals("OutputFormat doesn't match.", addedPartition_1.getOutputFormat(), sourceTable.getOutputFileFormat());
+ assertEquals("SerDe doesn't match.", addedPartition_1.getSerDe(), sourceTable.getSerdeLib());
+ assertEquals("SerDe params don't match.", addedPartition_1.getSerdeParams(), sourceTable.getSerdeParams());
+
+ // Replicate table definition.
+
+ HCatClient targetMetaStore = HCatClient.create(new Configuration(replicationTargetHCatConf));
+ targetMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE);
+
+ targetMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build());
+ // Make a copy of the source-table, as would be done across class-loaders.
+ HCatTable targetTable = targetMetaStore.deserializeTable(sourceMetaStore.serializeTable(sourceTable));
+ targetMetaStore.createTable(HCatCreateTableDesc.create(targetTable).build());
+ targetTable = targetMetaStore.getTable(dbName, tableName);
+
+ assertEquals("Created table doesn't match the source.",
+ targetTable.diff(sourceTable), HCatTable.NO_DIFF);
+
+ // Modify Table schema at the source.
+ List<HCatFieldSchema> newColumnSchema = new ArrayList<HCatFieldSchema>(columnSchema);
+ newColumnSchema.add(new HCatFieldSchema("goo_new", Type.DOUBLE, ""));
+ Map<String, String> tableParams = new HashMap<String, String>(1);
+ tableParams.put("orc.compress", "ZLIB");
+ sourceTable.cols(newColumnSchema) // Add a column.
+ .fileFormat("orcfile") // Change SerDe, File I/O formats.
+ .tblProps(tableParams)
+ .serdeParam(serdeConstants.FIELD_DELIM, Character.toString('\001'));
+ sourceMetaStore.updateTableSchema(dbName, tableName, sourceTable);
+ sourceTable = sourceMetaStore.getTable(dbName, tableName);
+
+ // Add another partition to the source.
+ Map<String, String> partitionSpec_2 = new HashMap<String, String>();
+ partitionSpec_2.put("grid", "AB");
+ partitionSpec_2.put("dt", "2012_01_01");
+ HCatPartition sourcePartition_2 = new HCatPartition(sourceTable, partitionSpec_2, "");
+ sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_2).build());
+
+ // The source table now has 2 partitions, one in TEXTFILE, the other in ORC.
+ // Test adding these partitions to the target-table *without* replicating the table-change.
+
+ HCatPartitionSpec sourcePartitionSpec = sourceMetaStore.getPartitionSpecs(dbName, tableName, -1);
+ assertEquals("Unexpected number of source partitions.", 2, sourcePartitionSpec.size());
+
+ // Serialize the hcatPartitionSpec.
+ List<String> partitionSpecString = sourceMetaStore.serializePartitionSpec(sourcePartitionSpec);
+
+ // Deserialize the HCatPartitionSpec using the target HCatClient instance.
+ HCatPartitionSpec targetPartitionSpec = targetMetaStore.deserializePartitionSpec(partitionSpecString);
+ assertEquals("Could not add the expected number of partitions.",
+ sourcePartitionSpec.size(), targetMetaStore.addPartitionSpec(targetPartitionSpec));
+
+ // Retrieve partitions.
+ targetPartitionSpec = targetMetaStore.getPartitionSpecs(dbName, tableName, -1);
+ assertEquals("Could not retrieve the expected number of partitions.",
+ sourcePartitionSpec.size(), targetPartitionSpec.size());
+
+ // Assert that the source and target partitions are equivalent.
+ HCatPartitionSpec.HCatPartitionIterator sourceIterator = sourcePartitionSpec.getPartitionIterator();
+ HCatPartitionSpec.HCatPartitionIterator targetIterator = targetPartitionSpec.getPartitionIterator();
+
+ while (targetIterator.hasNext()) {
+ assertTrue("Fewer target partitions than source.", sourceIterator.hasNext());
+ HCatPartition sourcePartition = sourceIterator.next();
+ HCatPartition targetPartition = targetIterator.next();
+ assertEquals("Column schema doesn't match.", sourcePartition.getColumns(), targetPartition.getColumns());
+ assertEquals("InputFormat doesn't match.", sourcePartition.getInputFormat(), targetPartition.getInputFormat());
+ assertEquals("OutputFormat doesn't match.", sourcePartition.getOutputFormat(), targetPartition.getOutputFormat());
+ assertEquals("SerDe doesn't match.", sourcePartition.getSerDe(), targetPartition.getSerDe());
+ assertEquals("SerDe params don't match.", sourcePartition.getSerdeParams(), targetPartition.getSerdeParams());
+
+ }
+ }
+ catch (Exception unexpected) {
+ LOG.error( "Unexpected exception! ", unexpected);
+ assertTrue("Unexpected exception! " + unexpected.getMessage(), false);
+ }
+ }
+
}