You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by tr...@apache.org on 2012/09/10 23:29:03 UTC
svn commit: r1383152 [12/27] - in /incubator/hcatalog/trunk: ./
hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/
hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/
hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ ...
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Mon Sep 10 23:28:55 2012
@@ -35,150 +35,152 @@ import org.apache.hcatalog.data.schema.H
public class HCatTableInfo implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- /** The db and table names */
- private final String databaseName;
- private final String tableName;
-
- /** The table schema. */
- private final HCatSchema dataColumns;
- private final HCatSchema partitionColumns;
-
- /** The table being written to */
- private final Table table;
-
- /** The storer info */
- private StorerInfo storerInfo;
-
- /**
- * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
- * for reading data from a table.
- * work with hadoop security, the kerberos principal name of the server - else null
- * The principal name should be of the form:
- * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
- * The special string _HOST will be replaced automatically with the correct host name
- * @param databaseName the db name
- * @param tableName the table name
- * @param dataColumns schema of columns which contain data
- * @param partitionColumns schema of partition columns
- * @param storerInfo information about storage descriptor
- * @param table hive metastore table class
- */
- HCatTableInfo(
- String databaseName,
- String tableName,
- HCatSchema dataColumns,
- HCatSchema partitionColumns,
- StorerInfo storerInfo,
- Table table) {
- this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
- this.tableName = tableName;
- this.dataColumns = dataColumns;
- this.table = table;
- this.storerInfo = storerInfo;
- this.partitionColumns = partitionColumns;
- }
-
- /**
- * Gets the value of databaseName
- * @return the databaseName
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * Gets the value of tableName
- * @return the tableName
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * @return return schema of data columns as defined in meta store
- */
- public HCatSchema getDataColumns() {
- return dataColumns;
- }
-
- /**
- * @return schema of partition columns
- */
- public HCatSchema getPartitionColumns() {
- return partitionColumns;
- }
-
- /**
- * @return the storerInfo
- */
- public StorerInfo getStorerInfo() {
- return storerInfo;
- }
-
- public String getTableLocation() {
- return table.getSd().getLocation();
- }
-
- /**
- * minimize dependency on hive classes so this is package private
- * this should eventually no longer be used
- * @return hive metastore representation of table
- */
- Table getTable() {
- return table;
- }
-
- /**
- * create an HCatTableInfo instance from the supplied Hive Table instance
- * @param table to create an instance from
- * @return HCatTableInfo
- * @throws IOException
- */
- static HCatTableInfo valueOf(Table table) throws IOException {
- // Explicitly use {@link org.apache.hadoop.hive.ql.metadata.Table} when getting the schema,
- // but store @{link org.apache.hadoop.hive.metastore.api.Table} as this class is serialized
- // into the job conf.
- org.apache.hadoop.hive.ql.metadata.Table mTable =
- new org.apache.hadoop.hive.ql.metadata.Table(table);
- HCatSchema schema = HCatUtil.extractSchema(mTable);
- StorerInfo storerInfo =
- InternalUtil.extractStorerInfo(table.getSd(), table.getParameters());
- HCatSchema partitionColumns = HCatUtil.getPartitionColumns(mTable);
- return new HCatTableInfo(table.getDbName(), table.getTableName(), schema,
- partitionColumns, storerInfo, table);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- HCatTableInfo tableInfo = (HCatTableInfo) o;
-
- if (dataColumns != null ? !dataColumns.equals(tableInfo.dataColumns) : tableInfo.dataColumns != null) return false;
- if (databaseName != null ? !databaseName.equals(tableInfo.databaseName) : tableInfo.databaseName != null) return false;
- if (partitionColumns != null ? !partitionColumns.equals(tableInfo.partitionColumns) : tableInfo.partitionColumns != null)
- return false;
- if (storerInfo != null ? !storerInfo.equals(tableInfo.storerInfo) : tableInfo.storerInfo != null) return false;
- if (table != null ? !table.equals(tableInfo.table) : tableInfo.table != null) return false;
- if (tableName != null ? !tableName.equals(tableInfo.tableName) : tableInfo.tableName != null) return false;
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
- int result = databaseName != null ? databaseName.hashCode() : 0;
- result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
- result = 31 * result + (dataColumns != null ? dataColumns.hashCode() : 0);
- result = 31 * result + (partitionColumns != null ? partitionColumns.hashCode() : 0);
- result = 31 * result + (table != null ? table.hashCode() : 0);
- result = 31 * result + (storerInfo != null ? storerInfo.hashCode() : 0);
- return result;
- }
+ /** The db and table names */
+ private final String databaseName;
+ private final String tableName;
+
+ /** The table schema. */
+ private final HCatSchema dataColumns;
+ private final HCatSchema partitionColumns;
+
+ /** The table being written to */
+ private final Table table;
+
+ /** The storer info */
+ private StorerInfo storerInfo;
+
+ /**
+ * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat}
+ * for reading data from a table.
+ * work with hadoop security, the kerberos principal name of the server - else null
+ * The principal name should be of the form:
+ * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+ * The special string _HOST will be replaced automatically with the correct host name
+ * @param databaseName the db name
+ * @param tableName the table name
+ * @param dataColumns schema of columns which contain data
+ * @param partitionColumns schema of partition columns
+ * @param storerInfo information about storage descriptor
+ * @param table hive metastore table class
+ */
+ HCatTableInfo(
+ String databaseName,
+ String tableName,
+ HCatSchema dataColumns,
+ HCatSchema partitionColumns,
+ StorerInfo storerInfo,
+ Table table) {
+ this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+ this.tableName = tableName;
+ this.dataColumns = dataColumns;
+ this.table = table;
+ this.storerInfo = storerInfo;
+ this.partitionColumns = partitionColumns;
+ }
+
+ /**
+ * Gets the value of databaseName
+ * @return the databaseName
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * Gets the value of tableName
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @return return schema of data columns as defined in meta store
+ */
+ public HCatSchema getDataColumns() {
+ return dataColumns;
+ }
+
+ /**
+ * @return schema of partition columns
+ */
+ public HCatSchema getPartitionColumns() {
+ return partitionColumns;
+ }
+
+ /**
+ * @return the storerInfo
+ */
+ public StorerInfo getStorerInfo() {
+ return storerInfo;
+ }
+
+ public String getTableLocation() {
+ return table.getSd().getLocation();
+ }
+
+ /**
+ * minimize dependency on hive classes so this is package private
+ * this should eventually no longer be used
+ * @return hive metastore representation of table
+ */
+ Table getTable() {
+ return table;
+ }
+
+ /**
+ * create an HCatTableInfo instance from the supplied Hive Table instance
+ * @param table to create an instance from
+ * @return HCatTableInfo
+ * @throws IOException
+ */
+ static HCatTableInfo valueOf(Table table) throws IOException {
+ // Explicitly use {@link org.apache.hadoop.hive.ql.metadata.Table} when getting the schema,
+ // but store @{link org.apache.hadoop.hive.metastore.api.Table} as this class is serialized
+ // into the job conf.
+ org.apache.hadoop.hive.ql.metadata.Table mTable =
+ new org.apache.hadoop.hive.ql.metadata.Table(table);
+ HCatSchema schema = HCatUtil.extractSchema(mTable);
+ StorerInfo storerInfo =
+ InternalUtil.extractStorerInfo(table.getSd(), table.getParameters());
+ HCatSchema partitionColumns = HCatUtil.getPartitionColumns(mTable);
+ return new HCatTableInfo(table.getDbName(), table.getTableName(), schema,
+ partitionColumns, storerInfo, table);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ HCatTableInfo tableInfo = (HCatTableInfo) o;
+
+ if (dataColumns != null ? !dataColumns.equals(tableInfo.dataColumns) : tableInfo.dataColumns != null)
+ return false;
+ if (databaseName != null ? !databaseName.equals(tableInfo.databaseName) : tableInfo.databaseName != null)
+ return false;
+ if (partitionColumns != null ? !partitionColumns.equals(tableInfo.partitionColumns) : tableInfo.partitionColumns != null)
+ return false;
+ if (storerInfo != null ? !storerInfo.equals(tableInfo.storerInfo) : tableInfo.storerInfo != null) return false;
+ if (table != null ? !table.equals(tableInfo.table) : tableInfo.table != null) return false;
+ if (tableName != null ? !tableName.equals(tableInfo.tableName) : tableInfo.tableName != null) return false;
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = databaseName != null ? databaseName.hashCode() : 0;
+ result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+ result = 31 * result + (dataColumns != null ? dataColumns.hashCode() : 0);
+ result = 31 * result + (partitionColumns != null ? partitionColumns.hashCode() : 0);
+ result = 31 * result + (table != null ? table.hashCode() : 0);
+ result = 31 * result + (storerInfo != null ? storerInfo.hashCode() : 0);
+ return result;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Mon Sep 10 23:28:55 2012
@@ -50,136 +50,136 @@ public class InitializeInput {
private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class);
- /**
- * Set the input to use for the Job. This queries the metadata server with the specified
- * partition predicates, gets the matching partitions, and puts the information in the job
- * configuration object.
- *
- * To ensure a known InputJobInfo state, only the database name, table name, filter, and
- * properties are preserved. All other modification from the given InputJobInfo are discarded.
- *
- * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows:
- * {code}
- * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(
- * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
- * {code}
- *
- * @param job the job object
- * @param theirInputJobInfo information on the Input to read
- * @throws Exception
- */
- public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception {
- InputJobInfo inputJobInfo = InputJobInfo.create(
- theirInputJobInfo.getDatabaseName(),
- theirInputJobInfo.getTableName(),
- theirInputJobInfo.getFilter());
- inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties());
- job.getConfiguration().set(
- HCatConstants.HCAT_KEY_JOB_INFO,
- HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null)));
- }
-
- /**
- * Returns the given InputJobInfo after populating with data queried from the metadata service.
- */
- private static InputJobInfo getInputJobInfo(
- Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception {
-
- HiveMetaStoreClient client = null;
- HiveConf hiveConf = null;
- try {
- if (job != null){
- hiveConf = HCatUtil.getHiveConf(job.getConfiguration());
- } else {
- hiveConf = new HiveConf(HCatInputFormat.class);
- }
- client = HCatUtil.getHiveClient(hiveConf);
- Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
- inputJobInfo.getTableName());
-
- List<PartInfo> partInfoList = new ArrayList<PartInfo>();
-
- inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
- if( table.getPartitionKeys().size() != 0 ) {
- //Partitioned table
- List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
- inputJobInfo.getTableName(),
- inputJobInfo.getFilter(),
- (short) -1);
-
- // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
- int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
- if (parts != null && parts.size() > maxPart) {
- throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size());
- }
+ /**
+ * Set the input to use for the Job. This queries the metadata server with the specified
+ * partition predicates, gets the matching partitions, and puts the information in the job
+ * configuration object.
+ *
+ * To ensure a known InputJobInfo state, only the database name, table name, filter, and
+ * properties are preserved. All other modification from the given InputJobInfo are discarded.
+ *
+ * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows:
+ * {code}
+ * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(
+ * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+ * {code}
+ *
+ * @param job the job object
+ * @param theirInputJobInfo information on the Input to read
+ * @throws Exception
+ */
+ public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception {
+ InputJobInfo inputJobInfo = InputJobInfo.create(
+ theirInputJobInfo.getDatabaseName(),
+ theirInputJobInfo.getTableName(),
+ theirInputJobInfo.getFilter());
+ inputJobInfo.getProperties().putAll(theirInputJobInfo.getProperties());
+ job.getConfiguration().set(
+ HCatConstants.HCAT_KEY_JOB_INFO,
+ HCatUtil.serialize(getInputJobInfo(job, inputJobInfo, null)));
+ }
- // populate partition info
- for (Partition ptn : parts){
- HCatSchema schema = HCatUtil.extractSchema(
- new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
- PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
- ptn.getParameters(), job.getConfiguration(), inputJobInfo);
- partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
- partInfoList.add(partInfo);
+ /**
+ * Returns the given InputJobInfo after populating with data queried from the metadata service.
+ */
+ private static InputJobInfo getInputJobInfo(
+ Job job, InputJobInfo inputJobInfo, String locationFilter) throws Exception {
+
+ HiveMetaStoreClient client = null;
+ HiveConf hiveConf = null;
+ try {
+ if (job != null) {
+ hiveConf = HCatUtil.getHiveConf(job.getConfiguration());
+ } else {
+ hiveConf = new HiveConf(HCatInputFormat.class);
+ }
+ client = HCatUtil.getHiveClient(hiveConf);
+ Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName());
+
+ List<PartInfo> partInfoList = new ArrayList<PartInfo>();
+
+ inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
+ if (table.getPartitionKeys().size() != 0) {
+ //Partitioned table
+ List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName(),
+ inputJobInfo.getFilter(),
+ (short) -1);
+
+ // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
+ int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
+ if (parts != null && parts.size() > maxPart) {
+ throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size());
+ }
+
+ // populate partition info
+ for (Partition ptn : parts) {
+ HCatSchema schema = HCatUtil.extractSchema(
+ new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
+ PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
+ ptn.getParameters(), job.getConfiguration(), inputJobInfo);
+ partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
+ partInfoList.add(partInfo);
+ }
+
+ } else {
+ //Non partitioned table
+ HCatSchema schema = HCatUtil.extractSchema(table);
+ PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
+ table.getParameters(), job.getConfiguration(), inputJobInfo);
+ partInfo.setPartitionValues(new HashMap<String, String>());
+ partInfoList.add(partInfo);
+ }
+ inputJobInfo.setPartitions(partInfoList);
+
+ return inputJobInfo;
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
}
- }else{
- //Non partitioned table
- HCatSchema schema = HCatUtil.extractSchema(table);
- PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
- table.getParameters(), job.getConfiguration(), inputJobInfo);
- partInfo.setPartitionValues(new HashMap<String,String>());
- partInfoList.add(partInfo);
- }
- inputJobInfo.setPartitions(partInfoList);
-
- return inputJobInfo;
- } finally {
- HCatUtil.closeHiveClientQuietly(client);
}
- }
+ private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException {
+ List<String> values = ptn.getValues();
+ if (values.size() != table.getPartitionKeys().size()) {
+ throw new IOException("Partition values in partition inconsistent with table definition, table "
+ + table.getTableName() + " has "
+ + table.getPartitionKeys().size()
+ + " partition keys, partition has " + values.size() + "partition values");
+ }
- private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException{
- List<String> values = ptn.getValues();
- if( values.size() != table.getPartitionKeys().size() ) {
- throw new IOException("Partition values in partition inconsistent with table definition, table "
- + table.getTableName() + " has "
- + table.getPartitionKeys().size()
- + " partition keys, partition has " + values.size() + "partition values" );
- }
+ Map<String, String> ptnKeyValues = new HashMap<String, String>();
- Map<String,String> ptnKeyValues = new HashMap<String,String>();
+ int i = 0;
+ for (FieldSchema schema : table.getPartitionKeys()) {
+ // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
+ ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+ i++;
+ }
- int i = 0;
- for(FieldSchema schema : table.getPartitionKeys()) {
- // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
- ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
- i++;
+ return ptnKeyValues;
}
- return ptnKeyValues;
- }
-
- private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd,
- Map<String,String> parameters, Configuration conf,
- InputJobInfo inputJobInfo) throws IOException{
+ private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd,
+ Map<String, String> parameters, Configuration conf,
+ InputJobInfo inputJobInfo) throws IOException {
- StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters);
+ StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
- Properties hcatProperties = new Properties();
- HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo);
+ Properties hcatProperties = new Properties();
+ HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo);
- // copy the properties from storageHandler to jobProperties
- Map<String, String>jobProperties = HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
+ // copy the properties from storageHandler to jobProperties
+ Map<String, String> jobProperties = HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
- for (String key : parameters.keySet()){
- hcatProperties.put(key, parameters.get(key));
+ for (String key : parameters.keySet()) {
+ hcatProperties.put(key, parameters.get(key));
+ }
+ // FIXME
+ // Bloating partinfo with inputJobInfo is not good
+ return new PartInfo(schema, storageHandler, sd.getLocation(),
+ hcatProperties, jobProperties, inputJobInfo.getTableInfo());
}
- // FIXME
- // Bloating partinfo with inputJobInfo is not good
- return new PartInfo(schema, storageHandler, sd.getLocation(),
- hcatProperties, jobProperties, inputJobInfo.getTableInfo());
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java Mon Sep 10 23:28:55 2012
@@ -24,114 +24,114 @@ import java.util.List;
import java.util.Properties;
/** The class used to serialize and store the information read from the metadata server */
-public class InputJobInfo implements Serializable{
+public class InputJobInfo implements Serializable {
- /** The serialization version */
- private static final long serialVersionUID = 1L;
+ /** The serialization version */
+ private static final long serialVersionUID = 1L;
- /** The db and table names. */
- private final String databaseName;
- private final String tableName;
-
- /** meta information of the table to be read from */
- private HCatTableInfo tableInfo;
-
- /** The partition filter */
- private String filter;
-
- /** The list of partitions matching the filter. */
- private List<PartInfo> partitions;
-
- /** implementation specific job properties */
- private Properties properties;
-
- /**
- * Initializes a new InputJobInfo
- * for reading data from a table.
- * @param databaseName the db name
- * @param tableName the table name
- * @param filter the partition filter
- */
-
- public static InputJobInfo create(String databaseName,
- String tableName,
- String filter) {
- return new InputJobInfo(databaseName, tableName, filter);
- }
-
-
- private InputJobInfo(String databaseName,
- String tableName,
- String filter) {
- this.databaseName = (databaseName == null) ?
- MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
- this.tableName = tableName;
- this.filter = filter;
- this.properties = new Properties();
- }
-
- /**
- * Gets the value of databaseName
- * @return the databaseName
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * Gets the value of tableName
- * @return the tableName
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * Gets the table's meta information
- * @return the HCatTableInfo
- */
- public HCatTableInfo getTableInfo() {
- return tableInfo;
- }
-
- /**
- * set the tablInfo instance
- * this should be the same instance
- * determined by this object's DatabaseName and TableName
- * @param tableInfo
- */
- void setTableInfo(HCatTableInfo tableInfo) {
- this.tableInfo = tableInfo;
- }
-
- /**
- * Gets the value of partition filter
- * @return the filter string
- */
- public String getFilter() {
- return filter;
- }
-
- /**
- * @return partition info
- */
- public List<PartInfo> getPartitions() {
- return partitions;
- }
-
- /**
- * @return partition info list
- */
- void setPartitions(List<PartInfo> partitions) {
- this.partitions = partitions;
- }
-
- /**
- * Set/Get Property information to be passed down to *StorageHandler implementation
- * put implementation specific storage handler configurations here
- * @return the implementation specific job properties
- */
- public Properties getProperties() {
- return properties;
- }
+ /** The db and table names. */
+ private final String databaseName;
+ private final String tableName;
+
+ /** meta information of the table to be read from */
+ private HCatTableInfo tableInfo;
+
+ /** The partition filter */
+ private String filter;
+
+ /** The list of partitions matching the filter. */
+ private List<PartInfo> partitions;
+
+ /** implementation specific job properties */
+ private Properties properties;
+
+ /**
+ * Initializes a new InputJobInfo
+ * for reading data from a table.
+ * @param databaseName the db name
+ * @param tableName the table name
+ * @param filter the partition filter
+ */
+
+ public static InputJobInfo create(String databaseName,
+ String tableName,
+ String filter) {
+ return new InputJobInfo(databaseName, tableName, filter);
+ }
+
+
+ private InputJobInfo(String databaseName,
+ String tableName,
+ String filter) {
+ this.databaseName = (databaseName == null) ?
+ MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+ this.tableName = tableName;
+ this.filter = filter;
+ this.properties = new Properties();
+ }
+
+ /**
+ * Gets the value of databaseName
+ * @return the databaseName
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * Gets the value of tableName
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Gets the table's meta information
+ * @return the HCatTableInfo
+ */
+ public HCatTableInfo getTableInfo() {
+ return tableInfo;
+ }
+
+ /**
+ * set the tablInfo instance
+ * this should be the same instance
+ * determined by this object's DatabaseName and TableName
+ * @param tableInfo
+ */
+ void setTableInfo(HCatTableInfo tableInfo) {
+ this.tableInfo = tableInfo;
+ }
+
+ /**
+ * Gets the value of partition filter
+ * @return the filter string
+ */
+ public String getFilter() {
+ return filter;
+ }
+
+ /**
+ * @return partition info
+ */
+ public List<PartInfo> getPartitions() {
+ return partitions;
+ }
+
+ /**
+ * @return partition info list
+ */
+ void setPartitions(List<PartInfo> partitions) {
+ this.partitions = partitions;
+ }
+
+ /**
+ * Set/Get Property information to be passed down to *StorageHandler implementation
+ * put implementation specific storage handler configurations here
+ * @return the implementation specific job properties
+ */
+ public Properties getProperties() {
+ return properties;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Mon Sep 10 23:28:55 2012
@@ -57,132 +57,132 @@ class InternalUtil {
static StorerInfo extractStorerInfo(StorageDescriptor sd, Map<String, String> properties) throws IOException {
Properties hcatProperties = new Properties();
- for (String key : properties.keySet()){
+ for (String key : properties.keySet()) {
hcatProperties.put(key, properties.get(key));
}
// also populate with StorageDescriptor->SerDe.Parameters
- for (Map.Entry<String, String>param :
+ for (Map.Entry<String, String> param :
sd.getSerdeInfo().getParameters().entrySet()) {
- hcatProperties.put(param.getKey(), param.getValue());
+ hcatProperties.put(param.getKey(), param.getValue());
}
return new StorerInfo(
- sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(),
- properties.get(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE),
- hcatProperties);
- }
-
- static StructObjectInspector createStructObjectInspector(HCatSchema outputSchema) throws IOException {
-
- if(outputSchema == null ) {
- throw new IOException("Invalid output schema specified");
- }
-
- List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
- List<String> fieldNames = new ArrayList<String>();
-
- for(HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) {
- TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString());
-
- fieldNames.add(hcatFieldSchema.getName());
- fieldInspectors.add(getObjectInspector(type));
- }
-
- StructObjectInspector structInspector = ObjectInspectorFactory.
- getStandardStructObjectInspector(fieldNames, fieldInspectors);
- return structInspector;
- }
-
- private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException {
-
- switch(type.getCategory()) {
-
- case PRIMITIVE :
- PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
- return PrimitiveObjectInspectorFactory.
- getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory());
-
- case MAP :
- MapTypeInfo mapType = (MapTypeInfo) type;
- MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
- getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo()));
- return mapInspector;
-
- case LIST :
- ListTypeInfo listType = (ListTypeInfo) type;
- ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector(
- getObjectInspector(listType.getListElementTypeInfo()));
- return listInspector;
-
- case STRUCT :
- StructTypeInfo structType = (StructTypeInfo) type;
- List<TypeInfo> fieldTypes = structType.getAllStructFieldTypeInfos();
-
- List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
- for(TypeInfo fieldType : fieldTypes) {
- fieldInspectors.add(getObjectInspector(fieldType));
- }
-
- StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
- structType.getAllStructFieldNames(), fieldInspectors);
- return structInspector;
-
- default :
- throw new IOException("Unknown field schema type");
- }
- }
-
- //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
- // if the default was decided by the serde
- static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo)
- throws SerDeException {
- serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema()));
- }
-
- static void initializeDeserializer(Deserializer deserializer, Configuration conf,
- HCatTableInfo info, HCatSchema schema) throws SerDeException {
- Properties props = getSerdeProperties(info, schema);
- LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props);
- deserializer.initialize(conf, props);
- }
-
- private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s)
- throws SerDeException {
- Properties props = new Properties();
- List<FieldSchema> fields = HCatUtil.getFieldSchemaList(s.getFields());
- props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS,
- MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
- props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
- MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
-
- // setting these props to match LazySimpleSerde
- props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_NULL_FORMAT, "\\N");
- props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
-
- //add props from params set in table schema
- props.putAll(info.getStorerInfo().getProperties());
-
- return props;
- }
-
-static Reporter createReporter(TaskAttemptContext context) {
- return new ProgressReporter(context);
- }
-
- /**
- * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails.
- * @param split the InputSplit
- * @return the HCatSplit
- * @throws IOException
- */
- public static HCatSplit castToHCatSplit(InputSplit split) throws IOException {
- if (split instanceof HCatSplit) {
- return (HCatSplit) split;
- } else {
- throw new IOException("Split must be " + HCatSplit.class.getName()
- + " but found " + split.getClass().getName());
+ sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(),
+ properties.get(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE),
+ hcatProperties);
+ }
+
+ static StructObjectInspector createStructObjectInspector(HCatSchema outputSchema) throws IOException {
+
+ if (outputSchema == null) {
+ throw new IOException("Invalid output schema specified");
+ }
+
+ List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+ List<String> fieldNames = new ArrayList<String>();
+
+ for (HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) {
+ TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString());
+
+ fieldNames.add(hcatFieldSchema.getName());
+ fieldInspectors.add(getObjectInspector(type));
+ }
+
+ StructObjectInspector structInspector = ObjectInspectorFactory.
+ getStandardStructObjectInspector(fieldNames, fieldInspectors);
+ return structInspector;
+ }
+
+ private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException {
+
+ switch (type.getCategory()) {
+
+ case PRIMITIVE:
+ PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
+ return PrimitiveObjectInspectorFactory.
+ getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory());
+
+ case MAP:
+ MapTypeInfo mapType = (MapTypeInfo) type;
+ MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
+ getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo()));
+ return mapInspector;
+
+ case LIST:
+ ListTypeInfo listType = (ListTypeInfo) type;
+ ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector(
+ getObjectInspector(listType.getListElementTypeInfo()));
+ return listInspector;
+
+ case STRUCT:
+ StructTypeInfo structType = (StructTypeInfo) type;
+ List<TypeInfo> fieldTypes = structType.getAllStructFieldTypeInfos();
+
+ List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+ for (TypeInfo fieldType : fieldTypes) {
+ fieldInspectors.add(getObjectInspector(fieldType));
+ }
+
+ StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ structType.getAllStructFieldNames(), fieldInspectors);
+ return structInspector;
+
+ default:
+ throw new IOException("Unknown field schema type");
+ }
+ }
+
+ //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
+ // if the default was decided by the serde
+ static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo)
+ throws SerDeException {
+ serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema()));
+ }
+
+ static void initializeDeserializer(Deserializer deserializer, Configuration conf,
+ HCatTableInfo info, HCatSchema schema) throws SerDeException {
+ Properties props = getSerdeProperties(info, schema);
+ LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props);
+ deserializer.initialize(conf, props);
+ }
+
+ private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s)
+ throws SerDeException {
+ Properties props = new Properties();
+ List<FieldSchema> fields = HCatUtil.getFieldSchemaList(s.getFields());
+ props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS,
+ MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+ props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
+ MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+
+ // setting these props to match LazySimpleSerde
+ props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_NULL_FORMAT, "\\N");
+ props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+
+ //add props from params set in table schema
+ props.putAll(info.getStorerInfo().getProperties());
+
+ return props;
+ }
+
+ static Reporter createReporter(TaskAttemptContext context) {
+ return new ProgressReporter(context);
+ }
+
+ /**
+ * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails.
+ * @param split the InputSplit
+ * @return the HCatSplit
+ * @throws IOException
+ */
+ public static HCatSplit castToHCatSplit(InputSplit split) throws IOException {
+ if (split instanceof HCatSplit) {
+ return (HCatSplit) split;
+ } else {
+ throw new IOException("Split must be " + HCatSplit.class.getName()
+ + " but found " + split.getClass().getName());
+ }
}
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java Mon Sep 10 23:28:55 2012
@@ -207,7 +207,7 @@ public class MultiOutputFormat extends O
* @throws InterruptedException
*/
public static <K, V> void write(String alias, K key, V value, TaskInputOutputContext context)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
KeyValue<K, V> keyval = new KeyValue<K, V>(key, value);
context.write(new Text(alias), keyval);
}
@@ -227,14 +227,14 @@ public class MultiOutputFormat extends O
@Override
public RecordWriter<Writable, Writable> getRecordWriter(TaskAttemptContext context)
- throws IOException,
- InterruptedException {
+ throws IOException,
+ InterruptedException {
return new MultiRecordWriter(context);
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
- InterruptedException {
+ InterruptedException {
return new MultiOutputCommitter(context);
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java Mon Sep 10 23:28:55 2012
@@ -38,7 +38,7 @@ abstract class OutputFormatContainer ext
/**
* @param of OutputFormat this instance will contain
*/
- public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>,? super Writable> of) {
+ public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> of) {
this.of = of;
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Mon Sep 10 23:28:55 2012
@@ -31,238 +31,239 @@ import org.apache.hcatalog.data.schema.H
/** The class used to serialize and store the output related information */
public class OutputJobInfo implements Serializable {
- /** The db and table names. */
- private final String databaseName;
- private final String tableName;
-
- /** The serialization version. */
- private static final long serialVersionUID = 1L;
-
- /** The table info provided by user. */
- private HCatTableInfo tableInfo;
-
- /** The output schema. This is given to us by user. This wont contain any
- * partition columns ,even if user has specified them.
- * */
- private HCatSchema outputSchema;
-
- /** The location of the partition being written */
- private String location;
-
- /** The partition values to publish to, if used for output*/
- private Map<String, String> partitionValues;
-
- private List<Integer> posOfPartCols;
- private List<Integer> posOfDynPartCols;
-
- private Properties properties;
-
- private int maxDynamicPartitions;
-
- /** List of keys for which values were not specified at write setup time, to be infered at write time */
- private List<String> dynamicPartitioningKeys;
-
- private boolean harRequested;
-
- /**
- * Initializes a new OutputJobInfo instance
- * for writing data from a table.
- * @param databaseName the db name
- * @param tableName the table name
- * @param partitionValues The partition values to publish to, can be null or empty Map to
- * work with hadoop security, the kerberos principal name of the server - else null
- * The principal name should be of the form:
- * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
- * The special string _HOST will be replaced automatically with the correct host name
- * indicate write to a unpartitioned table. For partitioned tables, this map should
- * contain keys for all partition columns with corresponding values.
- */
- public static OutputJobInfo create(String databaseName,
- String tableName,
- Map<String, String> partitionValues) {
- return new OutputJobInfo(databaseName,
- tableName,
- partitionValues);
- }
-
- private OutputJobInfo(String databaseName,
- String tableName,
- Map<String, String> partitionValues) {
- this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
- this.tableName = tableName;
- this.partitionValues = partitionValues;
- this.properties = new Properties();
- }
-
- /**
- * @return the posOfPartCols
- */
- protected List<Integer> getPosOfPartCols() {
- return posOfPartCols;
- }
-
- /**
- * @return the posOfDynPartCols
- */
- protected List<Integer> getPosOfDynPartCols() {
- return posOfDynPartCols;
- }
-
- /**
- * @param posOfPartCols the posOfPartCols to set
- */
- protected void setPosOfPartCols(List<Integer> posOfPartCols) {
- // sorting the list in the descending order so that deletes happen back-to-front
- Collections.sort(posOfPartCols, new Comparator<Integer> () {
- @Override
- public int compare(Integer earlier, Integer later) {
- return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
- }
- });
- this.posOfPartCols = posOfPartCols;
- }
+ /** The db and table names. */
+ private final String databaseName;
+ private final String tableName;
+
+ /** The serialization version. */
+ private static final long serialVersionUID = 1L;
+
+ /** The table info provided by user. */
+ private HCatTableInfo tableInfo;
+
+ /** The output schema. This is given to us by user. This wont contain any
+ * partition columns ,even if user has specified them.
+ * */
+ private HCatSchema outputSchema;
+
+ /** The location of the partition being written */
+ private String location;
+
+ /** The partition values to publish to, if used for output*/
+ private Map<String, String> partitionValues;
+
+ private List<Integer> posOfPartCols;
+ private List<Integer> posOfDynPartCols;
+
+ private Properties properties;
+
+ private int maxDynamicPartitions;
+
+ /** List of keys for which values were not specified at write setup time, to be infered at write time */
+ private List<String> dynamicPartitioningKeys;
+
+ private boolean harRequested;
+
+ /**
+ * Initializes a new OutputJobInfo instance
+ * for writing data from a table.
+ * @param databaseName the db name
+ * @param tableName the table name
+ * @param partitionValues The partition values to publish to, can be null or empty Map to
+ * work with hadoop security, the kerberos principal name of the server - else null
+ * The principal name should be of the form:
+ * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+ * The special string _HOST will be replaced automatically with the correct host name
+ * indicate write to a unpartitioned table. For partitioned tables, this map should
+ * contain keys for all partition columns with corresponding values.
+ */
+ public static OutputJobInfo create(String databaseName,
+ String tableName,
+ Map<String, String> partitionValues) {
+ return new OutputJobInfo(databaseName,
+ tableName,
+ partitionValues);
+ }
- /**
+ private OutputJobInfo(String databaseName,
+ String tableName,
+ Map<String, String> partitionValues) {
+ this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+ this.tableName = tableName;
+ this.partitionValues = partitionValues;
+ this.properties = new Properties();
+ }
+
+ /**
+ * @return the posOfPartCols
+ */
+ protected List<Integer> getPosOfPartCols() {
+ return posOfPartCols;
+ }
+
+ /**
+ * @return the posOfDynPartCols
+ */
+ protected List<Integer> getPosOfDynPartCols() {
+ return posOfDynPartCols;
+ }
+
+ /**
+ * @param posOfPartCols the posOfPartCols to set
+ */
+ protected void setPosOfPartCols(List<Integer> posOfPartCols) {
+ // sorting the list in the descending order so that deletes happen back-to-front
+ Collections.sort(posOfPartCols, new Comparator<Integer>() {
+ @Override
+ public int compare(Integer earlier, Integer later) {
+ return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
+ }
+ });
+ this.posOfPartCols = posOfPartCols;
+ }
+
+ /**
* @param posOfDynPartCols the posOfDynPartCols to set
*/
protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) {
- // Important - no sorting here! We retain order, it's used to match with values at runtime
- this.posOfDynPartCols = posOfDynPartCols;
+ // Important - no sorting here! We retain order, it's used to match with values at runtime
+ this.posOfDynPartCols = posOfDynPartCols;
}
- /**
- * @return the tableInfo
- */
- public HCatTableInfo getTableInfo() {
- return tableInfo;
- }
-
- /**
- * @return the outputSchema
- */
- public HCatSchema getOutputSchema() {
- return outputSchema;
- }
-
- /**
- * @param schema the outputSchema to set
- */
- public void setOutputSchema(HCatSchema schema) {
- this.outputSchema = schema;
- }
-
- /**
- * @return the location
- */
- public String getLocation() {
- return location;
- }
-
- /**
- * @param location location to write to
- */
- public void setLocation(String location) {
- this.location = location;
- }
- /**
- * Sets the value of partitionValues
- * @param partitionValues the partition values to set
- */
- void setPartitionValues(Map<String, String> partitionValues) {
- this.partitionValues = partitionValues;
- }
-
- /**
- * Gets the value of partitionValues
- * @return the partitionValues
- */
- public Map<String, String> getPartitionValues() {
- return partitionValues;
- }
-
- /**
- * set the tablInfo instance
- * this should be the same instance
- * determined by this object's DatabaseName and TableName
- * @param tableInfo
- */
- void setTableInfo(HCatTableInfo tableInfo) {
- this.tableInfo = tableInfo;
- }
-
- /**
- * @return database name of table to write to
- */
- public String getDatabaseName() {
- return databaseName;
- }
-
- /**
- * @return name of table to write to
- */
- public String getTableName() {
- return tableName;
- }
-
- /**
- * Set/Get Property information to be passed down to *StorageHandler implementation
- * put implementation specific storage handler configurations here
- * @return the implementation specific job properties
- */
- public Properties getProperties() {
- return properties;
- }
-
- /**
- * Set maximum number of allowable dynamic partitions
- * @param maxDynamicPartitions
- */
- public void setMaximumDynamicPartitions(int maxDynamicPartitions){
- this.maxDynamicPartitions = maxDynamicPartitions;
- }
-
- /**
- * Returns maximum number of allowable dynamic partitions
- * @return maximum number of allowable dynamic partitions
- */
- public int getMaxDynamicPartitions() {
- return this.maxDynamicPartitions;
- }
-
- /**
- * Sets whether or not hadoop archiving has been requested for this job
- * @param harRequested
- */
- public void setHarRequested(boolean harRequested){
- this.harRequested = harRequested;
- }
-
- /**
- * Returns whether or not hadoop archiving has been requested for this job
- * @return whether or not hadoop archiving has been requested for this job
- */
- public boolean getHarRequested() {
- return this.harRequested;
- }
-
- /**
- * Returns whether or not Dynamic Partitioning is used
- * @return whether or not dynamic partitioning is currently enabled and used
- */
- public boolean isDynamicPartitioningUsed() {
- return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
- }
-
- /**
- * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
- * @param dynamicPartitioningKeys
- */
- public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
- this.dynamicPartitioningKeys = dynamicPartitioningKeys;
- }
-
- public List<String> getDynamicPartitioningKeys(){
- return this.dynamicPartitioningKeys;
- }
+ /**
+ * @return the tableInfo
+ */
+ public HCatTableInfo getTableInfo() {
+ return tableInfo;
+ }
+
+ /**
+ * @return the outputSchema
+ */
+ public HCatSchema getOutputSchema() {
+ return outputSchema;
+ }
+
+ /**
+ * @param schema the outputSchema to set
+ */
+ public void setOutputSchema(HCatSchema schema) {
+ this.outputSchema = schema;
+ }
+
+ /**
+ * @return the location
+ */
+ public String getLocation() {
+ return location;
+ }
+
+ /**
+ * @param location location to write to
+ */
+ public void setLocation(String location) {
+ this.location = location;
+ }
+
+ /**
+ * Sets the value of partitionValues
+ * @param partitionValues the partition values to set
+ */
+ void setPartitionValues(Map<String, String> partitionValues) {
+ this.partitionValues = partitionValues;
+ }
+
+ /**
+ * Gets the value of partitionValues
+ * @return the partitionValues
+ */
+ public Map<String, String> getPartitionValues() {
+ return partitionValues;
+ }
+
+ /**
+ * set the tablInfo instance
+ * this should be the same instance
+ * determined by this object's DatabaseName and TableName
+ * @param tableInfo
+ */
+ void setTableInfo(HCatTableInfo tableInfo) {
+ this.tableInfo = tableInfo;
+ }
+
+ /**
+ * @return database name of table to write to
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * @return name of table to write to
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Set/Get Property information to be passed down to *StorageHandler implementation
+ * put implementation specific storage handler configurations here
+ * @return the implementation specific job properties
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Set maximum number of allowable dynamic partitions
+ * @param maxDynamicPartitions
+ */
+ public void setMaximumDynamicPartitions(int maxDynamicPartitions) {
+ this.maxDynamicPartitions = maxDynamicPartitions;
+ }
+
+ /**
+ * Returns maximum number of allowable dynamic partitions
+ * @return maximum number of allowable dynamic partitions
+ */
+ public int getMaxDynamicPartitions() {
+ return this.maxDynamicPartitions;
+ }
+
+ /**
+ * Sets whether or not hadoop archiving has been requested for this job
+ * @param harRequested
+ */
+ public void setHarRequested(boolean harRequested) {
+ this.harRequested = harRequested;
+ }
+
+ /**
+ * Returns whether or not hadoop archiving has been requested for this job
+ * @return whether or not hadoop archiving has been requested for this job
+ */
+ public boolean getHarRequested() {
+ return this.harRequested;
+ }
+
+ /**
+ * Returns whether or not Dynamic Partitioning is used
+ * @return whether or not dynamic partitioning is currently enabled and used
+ */
+ public boolean isDynamicPartitioningUsed() {
+ return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
+ }
+
+ /**
+ * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
+ * @param dynamicPartitioningKeys
+ */
+ public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
+ this.dynamicPartitioningKeys = dynamicPartitioningKeys;
+ }
+
+ public List<String> getDynamicPartitioningKeys() {
+ return this.dynamicPartitioningKeys;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java Mon Sep 10 23:28:55 2012
@@ -26,138 +26,138 @@ import org.apache.hcatalog.data.schema.H
/** The Class used to serialize the partition information read from the metadata server that maps to a partition. */
public class PartInfo implements Serializable {
- /** The serialization version */
- private static final long serialVersionUID = 1L;
+ /** The serialization version */
+ private static final long serialVersionUID = 1L;
- /** The partition schema. */
- private final HCatSchema partitionSchema;
+ /** The partition schema. */
+ private final HCatSchema partitionSchema;
- /** The information about which input storage handler to use */
- private final String storageHandlerClassName;
- private final String inputFormatClassName;
- private final String outputFormatClassName;
- private final String serdeClassName;
-
- /** HCat-specific properties set at the partition */
- private final Properties hcatProperties;
-
- /** The data location. */
- private final String location;
-
- /** The map of partition key names and their values. */
- private Map<String,String> partitionValues;
-
- /** Job properties associated with this parition */
- Map<String,String> jobProperties;
-
- /** the table info associated with this partition */
- HCatTableInfo tableInfo;
-
- /**
- * Instantiates a new hcat partition info.
- * @param partitionSchema the partition schema
- * @param storageHandler the storage handler
- * @param location the location
- * @param hcatProperties hcat-specific properties at the partition
- * @param jobProperties the job properties
- * @param tableInfo the table information
- */
- public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler,
- String location, Properties hcatProperties,
- Map<String,String> jobProperties, HCatTableInfo tableInfo){
- this.partitionSchema = partitionSchema;
- this.location = location;
- this.hcatProperties = hcatProperties;
- this.jobProperties = jobProperties;
- this.tableInfo = tableInfo;
-
- this.storageHandlerClassName = storageHandler.getClass().getName();
- this.inputFormatClassName = storageHandler.getInputFormatClass().getName();
- this.serdeClassName = storageHandler.getSerDeClass().getName();
- this.outputFormatClassName = storageHandler.getOutputFormatClass().getName();
-}
-
- /**
- * Gets the value of partitionSchema.
- * @return the partitionSchema
- */
- public HCatSchema getPartitionSchema() {
- return partitionSchema;
- }
-
- /**
- * @return the storage handler class name
- */
- public String getStorageHandlerClassName() {
- return storageHandlerClassName;
- }
-
- /**
- * @return the inputFormatClassName
- */
- public String getInputFormatClassName() {
- return inputFormatClassName;
- }
-
- /**
- * @return the outputFormatClassName
- */
- public String getOutputFormatClassName() {
- return outputFormatClassName;
- }
-
- /**
- * @return the serdeClassName
- */
- public String getSerdeClassName() {
- return serdeClassName;
- }
-
- /**
- * Gets the input storage handler properties.
- * @return HCat-specific properties set at the partition
- */
- public Properties getInputStorageHandlerProperties() {
- return hcatProperties;
- }
-
- /**
- * Gets the value of location.
- * @return the location
- */
- public String getLocation() {
- return location;
- }
-
- /**
- * Sets the partition values.
- * @param partitionValues the new partition values
- */
- public void setPartitionValues(Map<String,String> partitionValues) {
- this.partitionValues = partitionValues;
- }
-
- /**
- * Gets the partition values.
- * @return the partition values
- */
- public Map<String,String> getPartitionValues() {
- return partitionValues;
- }
-
- /**
- * Gets the job properties.
- * @return a map of the job properties
- */
- public Map<String,String> getJobProperties() {
- return jobProperties;
- }
-
- /**
- * Gets the HCatalog table information.
- * @return the table information
- */
- public HCatTableInfo getTableInfo() {
- return tableInfo;
- }
+ /** The information about which input storage handler to use */
+ private final String storageHandlerClassName;
+ private final String inputFormatClassName;
+ private final String outputFormatClassName;
+ private final String serdeClassName;
+
+ /** HCat-specific properties set at the partition */
+ private final Properties hcatProperties;
+
+ /** The data location. */
+ private final String location;
+
+ /** The map of partition key names and their values. */
+ private Map<String, String> partitionValues;
+
+ /** Job properties associated with this parition */
+ Map<String, String> jobProperties;
+
+ /** the table info associated with this partition */
+ HCatTableInfo tableInfo;
+
+ /**
+ * Instantiates a new hcat partition info.
+ * @param partitionSchema the partition schema
+ * @param storageHandler the storage handler
+ * @param location the location
+ * @param hcatProperties hcat-specific properties at the partition
+ * @param jobProperties the job properties
+ * @param tableInfo the table information
+ */
+ public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler,
+ String location, Properties hcatProperties,
+ Map<String, String> jobProperties, HCatTableInfo tableInfo) {
+ this.partitionSchema = partitionSchema;
+ this.location = location;
+ this.hcatProperties = hcatProperties;
+ this.jobProperties = jobProperties;
+ this.tableInfo = tableInfo;
+
+ this.storageHandlerClassName = storageHandler.getClass().getName();
+ this.inputFormatClassName = storageHandler.getInputFormatClass().getName();
+ this.serdeClassName = storageHandler.getSerDeClass().getName();
+ this.outputFormatClassName = storageHandler.getOutputFormatClass().getName();
+ }
+
+ /**
+ * Gets the value of partitionSchema.
+ * @return the partitionSchema
+ */
+ public HCatSchema getPartitionSchema() {
+ return partitionSchema;
+ }
+
+ /**
+ * @return the storage handler class name
+ */
+ public String getStorageHandlerClassName() {
+ return storageHandlerClassName;
+ }
+
+ /**
+ * @return the inputFormatClassName
+ */
+ public String getInputFormatClassName() {
+ return inputFormatClassName;
+ }
+
+ /**
+ * @return the outputFormatClassName
+ */
+ public String getOutputFormatClassName() {
+ return outputFormatClassName;
+ }
+
+ /**
+ * @return the serdeClassName
+ */
+ public String getSerdeClassName() {
+ return serdeClassName;
+ }
+
+ /**
+ * Gets the input storage handler properties.
+ * @return HCat-specific properties set at the partition
+ */
+ public Properties getInputStorageHandlerProperties() {
+ return hcatProperties;
+ }
+
+ /**
+ * Gets the value of location.
+ * @return the location
+ */
+ public String getLocation() {
+ return location;
+ }
+
+ /**
+ * Sets the partition values.
+ * @param partitionValues the new partition values
+ */
+ public void setPartitionValues(Map<String, String> partitionValues) {
+ this.partitionValues = partitionValues;
+ }
+
+ /**
+ * Gets the partition values.
+ * @return the partition values
+ */
+ public Map<String, String> getPartitionValues() {
+ return partitionValues;
+ }
+
+ /**
+ * Gets the job properties.
+ * @return a map of the job properties
+ */
+ public Map<String, String> getJobProperties() {
+ return jobProperties;
+ }
+
+ /**
+ * Gets the HCatalog table information.
+ * @return the table information
+ */
+ public HCatTableInfo getTableInfo() {
+ return tableInfo;
+ }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java Mon Sep 10 23:28:55 2012
@@ -27,65 +27,65 @@ import org.apache.hadoop.mapreduce.TaskI
class ProgressReporter extends StatusReporter implements Reporter {
- private TaskInputOutputContext context = null;
- private TaskAttemptContext taskAttemptContext = null;
+ private TaskInputOutputContext context = null;
+ private TaskAttemptContext taskAttemptContext = null;
- public ProgressReporter(TaskAttemptContext context) {
- if (context instanceof TaskInputOutputContext) {
- this.context = (TaskInputOutputContext) context;
- } else {
- taskAttemptContext = context;
- }
- }
-
- @Override
- public void setStatus(String status) {
- if (context != null) {
- context.setStatus(status);
- }
- }
-
- @Override
- public Counters.Counter getCounter(Enum<?> name) {
- return (context != null) ? (Counters.Counter) context.getCounter(name) : null;
- }
-
- @Override
- public Counters.Counter getCounter(String group, String name) {
- return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null;
- }
-
- @Override
- public void incrCounter(Enum<?> key, long amount) {
- if (context != null) {
- context.getCounter(key).increment(amount);
- }
- }
-
- @Override
- public void incrCounter(String group, String counter, long amount) {
- if (context != null) {
- context.getCounter(group, counter).increment(amount);
- }
- }
-
- @Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
- return null;
- }
-
- public float getProgress() {
- /* Required to build against 0.23 Reporter and StatusReporter. */
- /* TODO: determine the progress. */
- return 0.0f;
- }
-
- @Override
- public void progress() {
- if (context != null) {
- context.progress();
- } else {
- taskAttemptContext.progress();
+ public ProgressReporter(TaskAttemptContext context) {
+ if (context instanceof TaskInputOutputContext) {
+ this.context = (TaskInputOutputContext) context;
+ } else {
+ taskAttemptContext = context;
+ }
+ }
+
+ @Override
+ public void setStatus(String status) {
+ if (context != null) {
+ context.setStatus(status);
+ }
+ }
+
+ @Override
+ public Counters.Counter getCounter(Enum<?> name) {
+ return (context != null) ? (Counters.Counter) context.getCounter(name) : null;
+ }
+
+ @Override
+ public Counters.Counter getCounter(String group, String name) {
+ return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+ if (context != null) {
+ context.getCounter(key).increment(amount);
+ }
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+ if (context != null) {
+ context.getCounter(group, counter).increment(amount);
+ }
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ public float getProgress() {
+ /* Required to build against 0.23 Reporter and StatusReporter. */
+ /* TODO: determine the progress. */
+ return 0.0f;
+ }
+
+ @Override
+ public void progress() {
+ if (context != null) {
+ context.progress();
+ } else {
+ taskAttemptContext.progress();
+ }
}
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/Security.java Mon Sep 10 23:28:55 2012
@@ -44,134 +44,134 @@ import org.slf4j.LoggerFactory;
final class Security {
- private static final Logger LOG = LoggerFactory.getLogger(HCatOutputFormat.class);
-
- // making sure this is not initialized unless needed
- private static final class LazyHolder {
- public static final Security INSTANCE = new Security();
- }
-
- public static Security getInstance() {
- return LazyHolder.INSTANCE;
- }
-
- boolean isSecurityEnabled() {
- try {
- Method m = UserGroupInformation.class.getMethod("isSecurityEnabled");
- return (Boolean)m.invoke(null, (Object[])null);
- } catch (NoSuchMethodException e) {
- LOG.info("Security is not supported by this version of hadoop.", e);
- } catch (InvocationTargetException e) {
- String msg = "Failed to call isSecurityEnabled()";
- LOG.info(msg, e);
- throw new IllegalStateException(msg,e);
- } catch (IllegalAccessException e) {
- String msg = "Failed to call isSecurityEnabled()";
- LOG.info(msg, e);
- throw new IllegalStateException(msg,e);
- }
- return false;
- }
-
- // a signature string to associate with a HCatTableInfo - essentially
- // a concatenation of dbname, tablename and partition keyvalues.
- String getTokenSignature(OutputJobInfo outputJobInfo) {
- StringBuilder result = new StringBuilder("");
- String dbName = outputJobInfo.getDatabaseName();
- if(dbName != null) {
- result.append(dbName);
- }
- String tableName = outputJobInfo.getTableName();
- if(tableName != null) {
- result.append("." + tableName);
- }
- Map<String, String> partValues = outputJobInfo.getPartitionValues();
- if(partValues != null) {
- for(Entry<String, String> entry: partValues.entrySet()) {
- result.append("/");
- result.append(entry.getKey());
- result.append("=");
- result.append(entry.getValue());
- }
-
- }
- return result.toString();
- }
-
- void handleSecurity(
- Job job,
- OutputJobInfo outputJobInfo,
- HiveMetaStoreClient client,
- Configuration conf,
- boolean harRequested)
- throws IOException, MetaException, TException, Exception {
- if(UserGroupInformation.isSecurityEnabled()){
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- // check if oozie has set up a hcat deleg. token - if so use it
- TokenSelector<? extends TokenIdentifier> hiveTokenSelector = new DelegationTokenSelector();
- //Oozie does not change the service field of the token
- //hence by default token generation will have a value of "new Text("")"
- //HiveClient will look for a use TokenSelector.selectToken() with service
- //set to empty "Text" if hive.metastore.token.signature property is set to null
- Token<? extends TokenIdentifier> hiveToken = hiveTokenSelector.selectToken(
- new Text(), ugi.getTokens());
- if(hiveToken == null) {
- // we did not get token set up by oozie, let's get them ourselves here.
- // we essentially get a token per unique Output HCatTableInfo - this is
- // done because through Pig, setOutput() method is called multiple times
- // We want to only get the token once per unique output HCatTableInfo -
- // we cannot just get one token since in multi-query case (> 1 store in 1 job)
- // or the case when a single pig script results in > 1 jobs, the single
- // token will get cancelled by the output committer and the subsequent
- // stores will fail - by tying the token with the concatenation of
- // dbname, tablename and partition keyvalues of the output
- // TableInfo, we can have as many tokens as there are stores and the TokenSelector
- // will correctly pick the right tokens which the committer will use and
- // cancel.
- String tokenSignature = getTokenSignature(outputJobInfo);
- // get delegation tokens from hcat server and store them into the "job"
- // These will be used in to publish partitions to
- // hcat normally in OutputCommitter.commitJob()
- // when the JobTracker in Hadoop MapReduce starts supporting renewal of
- // arbitrary tokens, the renewer should be the principal of the JobTracker
- hiveToken = HCatUtil.extractThriftToken(client.getDelegationToken(ugi.getUserName()), tokenSignature);
-
- if (harRequested){
- TokenSelector<? extends TokenIdentifier> jtTokenSelector =
- new org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector();
- Token jtToken = jtTokenSelector.selectToken(org.apache.hadoop.security.SecurityUtil.buildTokenService(
- HCatHadoopShims.Instance.get().getResourceManagerAddress(conf)), ugi.getTokens());
- if(jtToken == null) {
- //we don't need to cancel this token as the TokenRenewer for JT tokens
- //takes care of cancelling them
- job.getCredentials().addToken(new Text("hcat jt token"),
- HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName()));
- }
- }
-
- job.getCredentials().addToken(new Text(ugi.getUserName() +"_"+ tokenSignature), hiveToken);
- // this will be used by the outputcommitter to pass on to the metastore client
- // which in turn will pass on to the TokenSelector so that it can select
- // the right token.
- job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
- }
- }
- }
-
- // we should cancel hcat token if it was acquired by hcat
- // and not if it was supplied (ie Oozie). In the latter
- // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
- void cancelToken(HiveMetaStoreClient client, JobContext context) throws IOException, MetaException {
- String tokenStrForm = client.getTokenStrForm();
- if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
- try {
- client.cancelDelegationToken(tokenStrForm);
- } catch (TException e) {
- String msg = "Failed to cancel delegation token";
- LOG.error(msg,e);
- throw new IOException(msg,e);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(HCatOutputFormat.class);
+
+ // making sure this is not initialized unless needed
+ private static final class LazyHolder {
+ public static final Security INSTANCE = new Security();
+ }
+
+ public static Security getInstance() {
+ return LazyHolder.INSTANCE;
+ }
+
+ boolean isSecurityEnabled() {
+ try {
+ Method m = UserGroupInformation.class.getMethod("isSecurityEnabled");
+ return (Boolean) m.invoke(null, (Object[]) null);
+ } catch (NoSuchMethodException e) {
+ LOG.info("Security is not supported by this version of hadoop.", e);
+ } catch (InvocationTargetException e) {
+ String msg = "Failed to call isSecurityEnabled()";
+ LOG.info(msg, e);
+ throw new IllegalStateException(msg, e);
+ } catch (IllegalAccessException e) {
+ String msg = "Failed to call isSecurityEnabled()";
+ LOG.info(msg, e);
+ throw new IllegalStateException(msg, e);
+ }
+ return false;
+ }
+
+ // a signature string to associate with a HCatTableInfo - essentially
+ // a concatenation of dbname, tablename and partition keyvalues.
+ String getTokenSignature(OutputJobInfo outputJobInfo) {
+ StringBuilder result = new StringBuilder("");
+ String dbName = outputJobInfo.getDatabaseName();
+ if (dbName != null) {
+ result.append(dbName);
+ }
+ String tableName = outputJobInfo.getTableName();
+ if (tableName != null) {
+ result.append("." + tableName);
+ }
+ Map<String, String> partValues = outputJobInfo.getPartitionValues();
+ if (partValues != null) {
+ for (Entry<String, String> entry : partValues.entrySet()) {
+ result.append("/");
+ result.append(entry.getKey());
+ result.append("=");
+ result.append(entry.getValue());
+ }
+
+ }
+ return result.toString();
+ }
+
+ void handleSecurity(
+ Job job,
+ OutputJobInfo outputJobInfo,
+ HiveMetaStoreClient client,
+ Configuration conf,
+ boolean harRequested)
+ throws IOException, MetaException, TException, Exception {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ // check if oozie has set up a hcat deleg. token - if so use it
+ TokenSelector<? extends TokenIdentifier> hiveTokenSelector = new DelegationTokenSelector();
+ //Oozie does not change the service field of the token
+ //hence by default token generation will have a value of "new Text("")"
+ //HiveClient will look for a use TokenSelector.selectToken() with service
+ //set to empty "Text" if hive.metastore.token.signature property is set to null
+ Token<? extends TokenIdentifier> hiveToken = hiveTokenSelector.selectToken(
+ new Text(), ugi.getTokens());
+ if (hiveToken == null) {
+ // we did not get token set up by oozie, let's get them ourselves here.
+ // we essentially get a token per unique Output HCatTableInfo - this is
+ // done because through Pig, setOutput() method is called multiple times
+ // We want to only get the token once per unique output HCatTableInfo -
+ // we cannot just get one token since in multi-query case (> 1 store in 1 job)
+ // or the case when a single pig script results in > 1 jobs, the single
+ // token will get cancelled by the output committer and the subsequent
+ // stores will fail - by tying the token with the concatenation of
+ // dbname, tablename and partition keyvalues of the output
+ // TableInfo, we can have as many tokens as there are stores and the TokenSelector
+ // will correctly pick the right tokens which the committer will use and
+ // cancel.
+ String tokenSignature = getTokenSignature(outputJobInfo);
+ // get delegation tokens from hcat server and store them into the "job"
+ // These will be used in to publish partitions to
+ // hcat normally in OutputCommitter.commitJob()
+ // when the JobTracker in Hadoop MapReduce starts supporting renewal of
+ // arbitrary tokens, the renewer should be the principal of the JobTracker
+ hiveToken = HCatUtil.extractThriftToken(client.getDelegationToken(ugi.getUserName()), tokenSignature);
+
+ if (harRequested) {
+ TokenSelector<? extends TokenIdentifier> jtTokenSelector =
+ new org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector();
+ Token jtToken = jtTokenSelector.selectToken(org.apache.hadoop.security.SecurityUtil.buildTokenService(
+ HCatHadoopShims.Instance.get().getResourceManagerAddress(conf)), ugi.getTokens());
+ if (jtToken == null) {
+ //we don't need to cancel this token as the TokenRenewer for JT tokens
+ //takes care of cancelling them
+ job.getCredentials().addToken(new Text("hcat jt token"),
+ HCatUtil.getJobTrackerDelegationToken(conf, ugi.getUserName()));
+ }
+ }
+
+ job.getCredentials().addToken(new Text(ugi.getUserName() + "_" + tokenSignature), hiveToken);
+ // this will be used by the outputcommitter to pass on to the metastore client
+ // which in turn will pass on to the TokenSelector so that it can select
+ // the right token.
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
+ }
+ }
+ }
+
+ // we should cancel hcat token if it was acquired by hcat
+ // and not if it was supplied (ie Oozie). In the latter
+ // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+ void cancelToken(HiveMetaStoreClient client, JobContext context) throws IOException, MetaException {
+ String tokenStrForm = client.getTokenStrForm();
+ if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ try {
+ client.cancelDelegationToken(tokenStrForm);
+ } catch (TException e) {
+ String msg = "Failed to cancel delegation token";
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
}
- }
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java Mon Sep 10 23:28:55 2012
@@ -47,12 +47,12 @@ public class StorerInfo implements Seria
* @param properties the properties for the storage handler
*/
public StorerInfo(String ifClass, String ofClass, String serdeClass, String storageHandlerClass, Properties properties) {
- super();
- this.ifClass =ifClass;
- this.ofClass = ofClass;
- this.serdeClass = serdeClass;
- this.storageHandlerClass = storageHandlerClass;
- this.properties = properties;
+ super();
+ this.ifClass = ifClass;
+ this.ofClass = ofClass;
+ this.serdeClass = serdeClass;
+ this.storageHandlerClass = storageHandlerClass;
+ this.properties = properties;
}
/**
@@ -94,14 +94,14 @@ public class StorerInfo implements Seria
* @return the storer properties
*/
public Properties getProperties() {
- return properties;
+ return properties;
}
/**
* @param properties the storer properties to set
*/
public void setProperties(Properties properties) {
- this.properties = properties;
+ this.properties = properties;
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java Mon Sep 10 23:28:55 2012
@@ -27,14 +27,14 @@ import org.apache.hcatalog.cli.SemanticA
public class JavaAction {
- public static void main(String[] args) throws Exception{
+ public static void main(String[] args) throws Exception {
- HiveConf conf = new HiveConf();
- conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
- conf.setVar(ConfVars.SEMANTIC_ANALYZER_HOOK, HCatSemanticAnalyzer.class.getName());
- conf.setBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL, true);
- SessionState.start(new CliSessionState(conf));
- new CliDriver().processLine(args[0]);
- }
+ HiveConf conf = new HiveConf();
+ conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+ conf.setVar(ConfVars.SEMANTIC_ANALYZER_HOOK, HCatSemanticAnalyzer.class.getName());
+ conf.setBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL, true);
+ SessionState.start(new CliSessionState(conf));
+ new CliDriver().processLine(args[0]);
+ }
}