You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/11/02 06:15:53 UTC

svn commit: r1636083 - in /hive/trunk/hcatalog: server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/ server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/ webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/...

Author: gunther
Date: Sun Nov  2 05:15:53 2014
New Revision: 1636083

URL: http://svn.apache.org/r1636083
Log:
HIVE-7576: Add PartitionSpec support in HCatClient API (Mithun Radhakrishnan, reviewed by Sushanth Sowmyan)

Added:
    hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java
Modified:
    hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java
    hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
    hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
    hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
    hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java
    hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java
    hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java

Modified: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java?rev=1636083&r1=1636082&r2=1636083&view=diff
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java (original)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/MessageFactory.java Sun Nov  2 05:15:53 2014
@@ -19,10 +19,13 @@
 
 package org.apache.hive.hcatalog.messaging;
 
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hive.hcatalog.messaging.json.JSONMessageFactory;
 
@@ -131,6 +134,16 @@ public abstract class MessageFactory {
     public abstract AddPartitionMessage buildAddPartitionMessage(Table table, List<Partition> partitions);
 
   /**
+   * Factory method for AddPartitionMessage.
+   * @param table The Table to which the partitions are added.
+   * @param partitionSpec The set of Partitions being added.
+   * @return AddPartitionMessage instance.
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec);
+
+  /**
    * Factory method for DropPartitionMessage.
    * @param table The Table from which the partition is dropped.
    * @param partition The Partition being dropped.

Modified: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java?rev=1636083&r1=1636082&r2=1636083&view=diff
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java (original)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/messaging/json/JSONMessageFactory.java Sun Nov  2 05:15:53 2014
@@ -19,9 +19,12 @@
 
 package org.apache.hive.hcatalog.messaging.json;
 
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hive.hcatalog.messaging.AddPartitionMessage;
 import org.apache.hive.hcatalog.messaging.CreateDatabaseMessage;
 import org.apache.hive.hcatalog.messaging.CreateTableMessage;
@@ -87,6 +90,14 @@ public class JSONMessageFactory extends 
   }
 
   @Override
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public AddPartitionMessage buildAddPartitionMessage(Table table, PartitionSpecProxy partitionSpec) {
+    return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
+        table.getTableName(), getPartitionKeyValues(table, partitionSpec), System.currentTimeMillis()/1000);
+  }
+
+  @Override
   public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) {
     return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
         partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)),
@@ -107,4 +118,16 @@ public class JSONMessageFactory extends 
       partitionList.add(getPartitionKeyValues(table, partition));
     return partitionList;
   }
+
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  private static List<Map<String, String>> getPartitionKeyValues(Table table, PartitionSpecProxy partitionSpec) {
+    List<Map<String, String>> partitionList = new ArrayList<Map<String, String>>();
+    PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator();
+    while (iterator.hasNext()) {
+      Partition partition = iterator.next();
+      partitionList.add(getPartitionKeyValues(table, partition));
+    }
+    return partitionList;
+  }
 }

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java?rev=1636083&r1=1636082&r2=1636083&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java Sun Nov  2 05:15:53 2014
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hive.hcatalog.common.HCatException;
@@ -213,6 +215,26 @@ public abstract class HCatClient {
   public abstract List<HCatPartition> deserializePartitions(List<String> hcatPartitionStringReps) throws HCatException;
 
   /**
+   * Serializer for HCatPartitionSpec.
+   * @param partitionSpec HCatPartitionSpec to be serialized.
+   * @return A list of Strings, representing the HCatPartitionSpec as a whole.
+   * @throws HCatException On failure to serialize.
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract List<String> serializePartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException;
+
+  /**
+   * Deserializer for HCatPartitionSpec.
+   * @param hcatPartitionSpecStrings List of strings, representing the HCatPartitionSpec as a whole.
+   * @return HCatPartitionSpec, reconstructed from the list of strings.
+   * @throws HCatException On failure to deserialize.
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException;
+
+  /**
    * Creates the table like an existing table.
    *
    * @param dbName The name of the database.
@@ -280,6 +302,21 @@ public abstract class HCatClient {
     throws HCatException;
 
   /**
+   * Gets partitions in terms of generic HCatPartitionSpec instances.
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, int maxPartitions) throws HCatException;
+
+  /**
+   * Gets partitions in terms of generic HCatPartitionSpec instances.
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, Map<String, String> partitionSelector, int maxPartitions)
+    throws HCatException;
+
+  /**
    * Gets the partition.
    *
    * @param dbName The database name.
@@ -312,6 +349,17 @@ public abstract class HCatClient {
     throws HCatException;
 
   /**
+   * Adds partitions using HCatPartitionSpec.
+   * @param partitionSpec The HCatPartitionSpec representing the set of partitions added.
+   * @return The number of partitions added.
+   * @throws HCatException On failure to add partitions.
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract int addPartitionSpec(HCatPartitionSpec partitionSpec)
+    throws HCatException;
+
+  /**
    * Drops partition(s) that match the specified (and possibly partial) partition specification.
    * A partial partition-specification is one where not all partition-keys have associated values. For example,
    * for a table ('myDb.myTable') with 2 partition keys (dt string, region string),
@@ -344,6 +392,14 @@ public abstract class HCatClient {
                                  String filter) throws HCatException;
 
   /**
+   * List partitions by filter, but as HCatPartitionSpecs.
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract HCatPartitionSpec listPartitionSpecsByFilter(String dbName, String tblName,
+                                                               String filter, int maxPartitions) throws HCatException;
+
+  /**
    * Mark partition for event.
    *
    * @param dbName The database name.

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java?rev=1636083&r1=1636082&r2=1636083&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java Sun Nov  2 05:15:53 2014
@@ -25,6 +25,8 @@ import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -353,6 +355,31 @@ public class HCatClientHMSImpl extends H
     return listPartitionsByFilter(dbName, tblName, getFilterString(partitionSpec));
   }
 
+  @Override
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, int maxPartitions) throws HCatException {
+    try {
+      return new HCatPartitionSpec(getTable(dbName, tableName),
+                                   hmsClient.listPartitionSpecs(dbName, tableName, maxPartitions));
+    }
+    catch (NoSuchObjectException e) {
+      throw new ObjectNotFoundException(
+          "NoSuchObjectException while retrieving partition.", e);
+    } catch (MetaException e) {
+      throw new HCatException(
+          "MetaException while retrieving partition.", e);
+    } catch (TException e) {
+      throw new ConnectionFailureException(
+          "TException while retrieving partition.", e);
+    }
+  }
+
+  @Override
+  public HCatPartitionSpec getPartitionSpecs(String dbName, String tableName, Map<String, String> partitionSelector, int maxPartitions) throws HCatException {
+    return listPartitionSpecsByFilter(dbName, tableName, getFilterString(partitionSelector), maxPartitions);
+  }
+
   private static String getFilterString(Map<String, String> partitionSpec) {
     final String AND = " AND ";
 
@@ -413,7 +440,7 @@ public class HCatClientHMSImpl extends H
     Table tbl = null;
     try {
       tbl = hmsClient.getTable(partInfo.getDatabaseName(),
-        partInfo.getTableName());
+          partInfo.getTableName());
       // TODO: Should be moved out.
       if (tbl.getPartitionKeysSize() == 0) {
         throw new HCatException("The table " + partInfo.getTableName()
@@ -511,6 +538,28 @@ public class HCatClientHMSImpl extends H
   }
 
   @Override
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public HCatPartitionSpec listPartitionSpecsByFilter(String dbName, String tblName, String filter, int maxPartitions)
+      throws HCatException {
+    try {
+      return new HCatPartitionSpec(getTable(dbName, tblName),
+                                   hmsClient.listPartitionSpecsByFilter(dbName, tblName, filter, maxPartitions));
+    }
+    catch(MetaException e) {
+      throw new HCatException("MetaException while fetching partitions.", e);
+    }
+    catch (NoSuchObjectException e) {
+      throw new ObjectNotFoundException(
+          "NoSuchObjectException while fetching partitions.", e);
+    }
+    catch (TException e) {
+      throw new ConnectionFailureException(
+        "TException while fetching partitions.", e);
+    }
+  }
+
+  @Override
   public void markPartitionForEvent(String dbName, String tblName,
                     Map<String, String> partKVs, PartitionEventType eventType)
     throws HCatException {
@@ -572,7 +621,7 @@ public class HCatClientHMSImpl extends H
     String token = null;
     try {
       token = hmsClient.getDelegationToken(owner,
-        renewerKerberosPrincipalName);
+          renewerKerberosPrincipalName);
     } catch (MetaException e) {
       throw new HCatException(
         "MetaException while getting delegation token.", e);
@@ -750,6 +799,30 @@ public class HCatClientHMSImpl extends H
   }
 
   @Override
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public int addPartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException {
+
+    try {
+      return hmsClient.add_partitions_pspec(partitionSpec.toPartitionSpecProxy());
+    } catch (InvalidObjectException e) {
+      throw new HCatException(
+          "InvalidObjectException while adding partition.", e);
+    } catch (AlreadyExistsException e) {
+      throw new HCatException(
+          "AlreadyExistsException while adding partition.", e);
+    } catch (MetaException e) {
+      throw new HCatException("MetaException while adding partition.", e);
+    } catch (NoSuchObjectException e) {
+      throw new ObjectNotFoundException("The table "
+          + "could not be found.", e);
+    } catch (TException e) {
+      throw new ConnectionFailureException(
+          "TException while adding partition.", e);
+    }
+  }
+
+  @Override
   public String getMessageBusTopicName(String dbName, String tableName) throws HCatException {
     try {
       return hmsClient.getTable(dbName, tableName).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
@@ -824,4 +897,16 @@ public class HCatClientHMSImpl extends H
     }
     return partitions;
   }
+
+  @Override
+  public List<String> serializePartitionSpec(HCatPartitionSpec partitionSpec) throws HCatException {
+    return MetadataSerializer.get().serializePartitionSpec(partitionSpec);
+  }
+
+  @Override
+  public HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException {
+    HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get().deserializePartitionSpec(hcatPartitionSpecStrings);
+    hcatPartitionSpec.hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName()));
+    return hcatPartitionSpec;
+  }
 }

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java?rev=1636083&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatPartitionSpec.java Sun Nov  2 05:15:53 2014
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api;
+
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hive.hcatalog.common.HCatException;
+
+/**
+ * Generalized representation of a set of HCatPartitions.
+ */
+
+@InterfaceAudience.LimitedPrivate({"Hive"})
+@InterfaceStability.Evolving
+public class HCatPartitionSpec {
+
+  protected HCatTable hcatTable;
+  protected PartitionSpecProxy partitionSpecProxy;
+
+  protected HCatPartitionSpec(HCatTable hcatTable, PartitionSpecProxy partitionSpecProxy) throws HCatException {
+    this.hcatTable = hcatTable;
+    this.partitionSpecProxy = partitionSpecProxy;
+    assert_invariant();
+  }
+
+  /**
+   * Getter for DBName of this HCatPartitionSpec.
+   * @return The name of the DB.
+   */
+  public String getDbName() {
+    return partitionSpecProxy.getDbName();
+  }
+
+  /**
+   * Getter for TableName of this HCatPartitionSpec.
+   * @return The name of the TableName.
+   */
+  public String getTableName() {
+    return partitionSpecProxy.getTableName();
+  }
+
+  /**
+   * Setter for HCatTable. Required for deserialization.
+   */
+  void hcatTable(HCatTable hcatTable) throws HCatException {
+
+    assert this.hcatTable == null : "Expected hcatTable to be null at this point.";
+    this.hcatTable = hcatTable;
+    assert_invariant();
+
+  }
+
+  /**
+   * Conversion to a Hive Metastore API PartitionSpecProxy instance.
+   */
+  PartitionSpecProxy toPartitionSpecProxy() {
+    return partitionSpecProxy;
+  }
+
+  /**
+   * Getter for the number of HCatPartitions represented by this HCatPartitionSpec instance.
+   * @return The number of HCatPartitions.
+   * @throws HCatException On failure.
+   */
+  public int size() throws HCatException {
+    return partitionSpecProxy.size();
+  }
+
+  /**
+   * Setter for the "root" location of the HCatPartitionSpec.
+   * @param location The new "root" location of the HCatPartitionSpec.
+   * @throws HCatException On failure to set a new location.
+   */
+  public void setRootLocation(String location) throws HCatException {
+    try {
+      partitionSpecProxy.setRootLocation(location);
+    }
+    catch (MetaException metaException) {
+      throw new HCatException("Unable to set root-path!", metaException);
+    }
+  }
+
+  /**
+   * Getter for an Iterator to the first HCatPartition in the HCatPartitionSpec.
+   * @return HCatPartitionIterator to the first HCatPartition.
+   */
+  public HCatPartitionIterator getPartitionIterator() {
+    return new HCatPartitionIterator(hcatTable, partitionSpecProxy.getPartitionIterator());
+  }
+
+  // Assert class invariant.
+  private void assert_invariant() throws HCatException {
+
+    if (hcatTable != null) {
+
+      if (!hcatTable.getDbName().equalsIgnoreCase(partitionSpecProxy.getDbName())) {
+        String errorMessage = "Invalid HCatPartitionSpec instance: Table's DBName (" + hcatTable.getDbName() + ") " +
+            "doesn't match PartitionSpec (" + partitionSpecProxy.getDbName() + ")";
+        assert false : errorMessage;
+        throw new HCatException(errorMessage);
+      }
+
+      if (!hcatTable.getTableName().equalsIgnoreCase(partitionSpecProxy.getTableName())) {
+        String errorMessage = "Invalid HCatPartitionSpec instance: Table's TableName (" + hcatTable.getTableName() + ") " +
+            "doesn't match PartitionSpec (" + partitionSpecProxy.getTableName() + ")";
+        assert false : errorMessage;
+        throw new HCatException(errorMessage);
+      }
+    }
+  }
+
+
+  /**
+   * Iterator over HCatPartitions in the HCatPartitionSpec.
+   */
+  public static class HCatPartitionIterator { // implements java.util.Iterator<HCatPartition> {
+
+    private HCatTable hcatTable;
+    private PartitionSpecProxy.PartitionIterator iterator;
+
+    HCatPartitionIterator(HCatTable hcatTable, PartitionSpecProxy.PartitionIterator iterator) {
+      this.hcatTable = hcatTable;
+      this.iterator = iterator;
+    }
+
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    public HCatPartition next() throws HCatException {
+      return new HCatPartition(hcatTable, iterator.next());
+    }
+
+    public void remove() {
+      iterator.remove();
+    }
+
+  } // class HCatPartitionIterator;
+
+} // class HCatPartitionSpec;

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java?rev=1636083&r1=1636082&r2=1636083&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataJSONSerializer.java Sun Nov  2 05:15:53 2014
@@ -1,7 +1,11 @@
 package org.apache.hive.hcatalog.api;
 
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hive.hcatalog.common.HCatException;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -10,6 +14,9 @@ import org.apache.thrift.protocol.TJSONP
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * MetadataSerializer implementation, that serializes HCat API elements into JSON.
  */
@@ -68,4 +75,38 @@ class MetadataJSONSerializer extends Met
       throw new HCatException("Could not de-serialize HCatPartition.", exception);
     }
   }
+
+  @Override
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public List<String> serializePartitionSpec(HCatPartitionSpec hcatPartitionSpec) throws HCatException {
+    try {
+      List<String> stringReps = new ArrayList<String>();
+      TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+      for (PartitionSpec partitionSpec : hcatPartitionSpec.partitionSpecProxy.toPartitionSpec()) {
+        stringReps.add(serializer.toString(partitionSpec, "UTF-8"));
+      }
+      return stringReps;
+    }
+    catch (TException serializationException) {
+      throw new HCatException("Failed to serialize!", serializationException);
+    }
+  }
+
+  @Override
+  public HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException {
+    try {
+      List<PartitionSpec> partitionSpecList = new ArrayList<PartitionSpec>();
+      TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
+      for (String stringRep : hcatPartitionSpecStrings) {
+        PartitionSpec partSpec = new PartitionSpec();
+        deserializer.deserialize(partSpec, stringRep, "UTF-8");
+        partitionSpecList.add(partSpec);
+      }
+      return new HCatPartitionSpec(null, PartitionSpecProxy.Factory.get(partitionSpecList));
+    }
+    catch (TException deserializationException) {
+      throw new HCatException("Failed to deserialize!", deserializationException);
+    }
+  }
 }

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java?rev=1636083&r1=1636082&r2=1636083&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/MetadataSerializer.java Sun Nov  2 05:15:53 2014
@@ -1,7 +1,11 @@
 package org.apache.hive.hcatalog.api;
 
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hive.hcatalog.common.HCatException;
 
+import java.util.List;
+
 /**
  * Interface to serialize HCat API elements.
  */
@@ -51,4 +55,24 @@ abstract class MetadataSerializer {
    */
   public abstract HCatPartition deserializePartition(String hcatPartitionStringRep) throws HCatException;
 
+  /**
+   * Serializer for HCatPartitionSpec.
+   * @param hcatPartitionSpec HCatPartitionSpec instance to be serialized.
+   * @return Serialized string-representations.
+   * @throws HCatException On failure to serialize.
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract List<String> serializePartitionSpec(HCatPartitionSpec hcatPartitionSpec) throws HCatException;
+
+  /**
+   * Deserializer for HCatPartitionSpec string-representations.
+   * @param hcatPartitionSpecStrings List of strings to be converted into an HCatPartitionSpec.
+   * @return Deserialized HCatPartitionSpec instance.
+   * @throws HCatException On failure to deserialize. (e.g. incompatible serialization format, etc.)
+   */
+  @InterfaceAudience.LimitedPrivate({"Hive"})
+  @InterfaceStability.Evolving
+  public abstract HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException;
+
 }

Modified: hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java?rev=1636083&r1=1636082&r2=1636083&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java Sun Nov  2 05:15:53 2014
@@ -975,7 +975,7 @@ public class TestHCatClient {
       sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_2).build());
 
       // The source table now has 2 partitions, one in TEXTFILE, the other in ORC.
-      // Test that adding these partitions to the target-table *without* replicating the table-change.
+      // Test adding these partitions to the target-table *without* replicating the table-change.
 
       List<HCatPartition> sourcePartitions = sourceMetaStore.getPartitions(dbName, tableName);
       assertEquals("Unexpected number of source partitions.", 2, sourcePartitions.size());
@@ -1007,4 +1007,139 @@ public class TestHCatClient {
       assertTrue("Unexpected exception! " + unexpected.getMessage(), false);
     }
   }
+
+  /**
+   * Test that partition-definitions can be replicated between HCat-instances,
+   * independently of table-metadata replication, using PartitionSpec interfaces.
+   * (This is essentially the same test as testPartitionRegistrationWithCustomSchema(),
+   * transliterated to use the PartitionSpec APIs.)
+   * 2 identical tables are created on 2 different HCat instances ("source" and "target").
+   * On the source instance,
+   * 1. One partition is added with the old format ("TEXTFILE").
+   * 2. The table is updated with an additional column and the data-format changed to ORC.
+   * 3. Another partition is added with the new format.
+   * 4. The partitions' metadata is copied to the target HCat instance, without updating the target table definition.
+   * 5. The partitions' metadata is tested to be an exact replica of that on the source.
+   * @throws Exception
+   */
+  @Test
+  public void testPartitionSpecRegistrationWithCustomSchema() throws Exception {
+    try {
+      startReplicationTargetMetaStoreIfRequired();
+
+      HCatClient sourceMetaStore = HCatClient.create(new Configuration(hcatConf));
+      final String dbName = "myDb";
+      final String tableName = "myTable";
+
+      sourceMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE);
+
+      sourceMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build());
+      List<HCatFieldSchema> columnSchema = new ArrayList<HCatFieldSchema>(
+          Arrays.asList(new HCatFieldSchema("foo", Type.INT, ""),
+              new HCatFieldSchema("bar", Type.STRING, "")));
+
+      List<HCatFieldSchema> partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""),
+          new HCatFieldSchema("grid", Type.STRING, ""));
+
+      HCatTable sourceTable = new HCatTable(dbName, tableName).cols(columnSchema)
+          .partCols(partitionSchema)
+          .comment("Source table.");
+
+      sourceMetaStore.createTable(HCatCreateTableDesc.create(sourceTable).build());
+
+      // Verify that the sourceTable was created successfully.
+      sourceTable = sourceMetaStore.getTable(dbName, tableName);
+      assertNotNull("Table couldn't be queried for. ", sourceTable);
+
+      // Partitions added now should inherit table-schema, properties, etc.
+      Map<String, String> partitionSpec_1 = new HashMap<String, String>();
+      partitionSpec_1.put("grid", "AB");
+      partitionSpec_1.put("dt", "2011_12_31");
+      HCatPartition sourcePartition_1 = new HCatPartition(sourceTable, partitionSpec_1, "");
+
+      sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_1).build());
+      assertEquals("Unexpected number of partitions. ",
+          sourceMetaStore.getPartitions(dbName, tableName).size(), 1);
+      // Verify that partition_1 was added correctly, and properties were inherited from the HCatTable.
+      HCatPartition addedPartition_1 = sourceMetaStore.getPartition(dbName, tableName, partitionSpec_1);
+      assertEquals("Column schema doesn't match.", addedPartition_1.getColumns(), sourceTable.getCols());
+      assertEquals("InputFormat doesn't match.", addedPartition_1.getInputFormat(), sourceTable.getInputFileFormat());
+      assertEquals("OutputFormat doesn't match.", addedPartition_1.getOutputFormat(), sourceTable.getOutputFileFormat());
+      assertEquals("SerDe doesn't match.", addedPartition_1.getSerDe(), sourceTable.getSerdeLib());
+      assertEquals("SerDe params don't match.", addedPartition_1.getSerdeParams(), sourceTable.getSerdeParams());
+
+      // Replicate table definition.
+
+      HCatClient targetMetaStore = HCatClient.create(new Configuration(replicationTargetHCatConf));
+      targetMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE);
+
+      targetMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build());
+      // Make a copy of the source-table, as would be done across class-loaders.
+      HCatTable targetTable = targetMetaStore.deserializeTable(sourceMetaStore.serializeTable(sourceTable));
+      targetMetaStore.createTable(HCatCreateTableDesc.create(targetTable).build());
+      targetTable = targetMetaStore.getTable(dbName, tableName);
+
+      assertEquals("Created table doesn't match the source.",
+          targetTable.diff(sourceTable), HCatTable.NO_DIFF);
+
+      // Modify Table schema at the source.
+      List<HCatFieldSchema> newColumnSchema = new ArrayList<HCatFieldSchema>(columnSchema);
+      newColumnSchema.add(new HCatFieldSchema("goo_new", Type.DOUBLE, ""));
+      Map<String, String> tableParams = new HashMap<String, String>(1);
+      tableParams.put("orc.compress", "ZLIB");
+      sourceTable.cols(newColumnSchema) // Add a column.
+          .fileFormat("orcfile")     // Change SerDe, File I/O formats.
+          .tblProps(tableParams)
+          .serdeParam(serdeConstants.FIELD_DELIM, Character.toString('\001'));
+      sourceMetaStore.updateTableSchema(dbName, tableName, sourceTable);
+      sourceTable = sourceMetaStore.getTable(dbName, tableName);
+
+      // Add another partition to the source.
+      Map<String, String> partitionSpec_2 = new HashMap<String, String>();
+      partitionSpec_2.put("grid", "AB");
+      partitionSpec_2.put("dt", "2012_01_01");
+      HCatPartition sourcePartition_2 = new HCatPartition(sourceTable, partitionSpec_2, "");
+      sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_2).build());
+
+      // The source table now has 2 partitions, one in TEXTFILE, the other in ORC.
+      // Test adding these partitions to the target-table *without* replicating the table-change.
+
+      HCatPartitionSpec sourcePartitionSpec = sourceMetaStore.getPartitionSpecs(dbName, tableName, -1);
+      assertEquals("Unexpected number of source partitions.", 2, sourcePartitionSpec.size());
+
+      // Serialize the hcatPartitionSpec.
+      List<String> partitionSpecString = sourceMetaStore.serializePartitionSpec(sourcePartitionSpec);
+
+      // Deserialize the HCatPartitionSpec using the target HCatClient instance.
+      HCatPartitionSpec targetPartitionSpec = targetMetaStore.deserializePartitionSpec(partitionSpecString);
+      assertEquals("Could not add the expected number of partitions.",
+          sourcePartitionSpec.size(), targetMetaStore.addPartitionSpec(targetPartitionSpec));
+
+      // Retrieve partitions.
+      targetPartitionSpec = targetMetaStore.getPartitionSpecs(dbName, tableName, -1);
+      assertEquals("Could not retrieve the expected number of partitions.",
+          sourcePartitionSpec.size(), targetPartitionSpec.size());
+
+      // Assert that the source and target partitions are equivalent.
+      HCatPartitionSpec.HCatPartitionIterator sourceIterator = sourcePartitionSpec.getPartitionIterator();
+      HCatPartitionSpec.HCatPartitionIterator targetIterator = targetPartitionSpec.getPartitionIterator();
+
+      while (targetIterator.hasNext()) {
+        assertTrue("Fewer target partitions than source.", sourceIterator.hasNext());
+        HCatPartition sourcePartition = sourceIterator.next();
+        HCatPartition targetPartition = targetIterator.next();
+        assertEquals("Column schema doesn't match.", sourcePartition.getColumns(), targetPartition.getColumns());
+        assertEquals("InputFormat doesn't match.", sourcePartition.getInputFormat(), targetPartition.getInputFormat());
+        assertEquals("OutputFormat doesn't match.", sourcePartition.getOutputFormat(), targetPartition.getOutputFormat());
+        assertEquals("SerDe doesn't match.", sourcePartition.getSerDe(), targetPartition.getSerDe());
+        assertEquals("SerDe params don't match.", sourcePartition.getSerdeParams(), targetPartition.getSerdeParams());
+
+      }
+    }
+    catch (Exception unexpected) {
+      LOG.error( "Unexpected exception! ",  unexpected);
+      assertTrue("Unexpected exception! " + unexpected.getMessage(), false);
+    }
+  }
+
 }