You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC
svn commit: r1520466 [7/18] - in /hive/trunk/hcatalog:
core/src/main/java/org/apache/hcatalog/cli/
core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/
core/src/main/java/org/apache/hcatalog/common/
core/src/main/java/org/apache/hcatalog/data/ ...
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InitializeInput.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Class which handles querying the metadata server using the MetaStoreClient. The list of
+ * partitions matching the partition filter is fetched from the server and the information is
+ * serialized and written into the JobContext configuration. The inputInfo is also updated with
+ * info required in the client process context.
+ */
+class InitializeInput {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InitializeInput.class);
+
+ /**
+ * @see org.apache.hcatalog.mapreduce.InitializeInput#setInput(org.apache.hadoop.conf.Configuration, InputJobInfo)
+ */
+ public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exception {
+ setInput(job.getConfiguration(), theirInputJobInfo);
+ }
+
+ /**
+ * Set the input to use for the Job. This queries the metadata server with the specified
+ * partition predicates, gets the matching partitions, and puts the information in the job
+ * configuration object.
+ *
+ * To ensure a known InputJobInfo state, only the database name, table name, filter, and
+ * properties are preserved. All other modification from the given InputJobInfo are discarded.
+ *
+ * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows:
+ * {code}
+ * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(
+ * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+ * {code}
+ *
+ * @param conf the job Configuration object
+ * @param theirInputJobInfo information on the Input to read
+ * @throws Exception
+ */
+ public static void setInput(Configuration conf,
+ InputJobInfo theirInputJobInfo) throws Exception {
+ InputJobInfo inputJobInfo = InputJobInfo.create(
+ theirInputJobInfo.getDatabaseName(),
+ theirInputJobInfo.getTableName(),
+ theirInputJobInfo.getFilter(),
+ theirInputJobInfo.getProperties());
+ conf.set(
+ HCatConstants.HCAT_KEY_JOB_INFO,
+ HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null)));
+ }
+
+ /**
+ * Returns the given InputJobInfo after populating with data queried from the metadata service.
+ */
+ private static InputJobInfo getInputJobInfo(
+ Configuration conf, InputJobInfo inputJobInfo, String locationFilter) throws Exception {
+ HiveMetaStoreClient client = null;
+ HiveConf hiveConf = null;
+ try {
+ if (conf != null) {
+ hiveConf = HCatUtil.getHiveConf(conf);
+ } else {
+ hiveConf = new HiveConf(HCatInputFormat.class);
+ }
+ client = HCatUtil.getHiveClient(hiveConf);
+ Table table = HCatUtil.getTable(client, inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName());
+
+ List<PartInfo> partInfoList = new ArrayList<PartInfo>();
+
+ inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table.getTTable()));
+ if (table.getPartitionKeys().size() != 0) {
+ //Partitioned table
+ List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
+ inputJobInfo.getTableName(),
+ inputJobInfo.getFilter(),
+ (short) -1);
+
+ // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
+ int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
+ if (parts != null && parts.size() > maxPart) {
+ throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size());
+ }
+
+ // populate partition info
+ for (Partition ptn : parts) {
+ HCatSchema schema = HCatUtil.extractSchema(
+ new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
+ PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
+ ptn.getParameters(), conf, inputJobInfo);
+ partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table, ptn));
+ partInfoList.add(partInfo);
+ }
+
+ } else {
+ //Non partitioned table
+ HCatSchema schema = HCatUtil.extractSchema(table);
+ PartInfo partInfo = extractPartInfo(schema, table.getTTable().getSd(),
+ table.getParameters(), conf, inputJobInfo);
+ partInfo.setPartitionValues(new HashMap<String, String>());
+ partInfoList.add(partInfo);
+ }
+ inputJobInfo.setPartitions(partInfoList);
+
+ return inputJobInfo;
+ } finally {
+ HCatUtil.closeHiveClientQuietly(client);
+ }
+
+ }
+
+ private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd,
+ Map<String, String> parameters, Configuration conf,
+ InputJobInfo inputJobInfo) throws IOException {
+
+ StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd, parameters);
+
+ Properties hcatProperties = new Properties();
+ HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, storerInfo);
+
+ // copy the properties from storageHandler to jobProperties
+ Map<String, String> jobProperties = HCatUtil.getInputJobProperties(storageHandler, inputJobInfo);
+
+ for (String key : parameters.keySet()) {
+ hcatProperties.put(key, parameters.get(key));
+ }
+ // FIXME
+ // Bloating partinfo with inputJobInfo is not good
+ return new PartInfo(schema, storageHandler, sd.getLocation(),
+ hcatProperties, jobProperties, inputJobInfo.getTableInfo());
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InputJobInfo.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Container for metadata read from the metadata server.
+ * Prior to release 0.5, InputJobInfo was a key part of the public API, exposed directly
+ * to end-users as an argument to
+ * {@link HCatInputFormat#setInput(org.apache.hadoop.mapreduce.Job, InputJobInfo)}.
+ * Going forward, we plan on treating InputJobInfo as an implementation detail and no longer
+ * expose to end-users. Should you have a need to use InputJobInfo outside HCatalog itself,
+ * please contact the developer mailing list before depending on this class.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InputJobInfo implements Serializable {
+
+ /** The serialization version */
+ private static final long serialVersionUID = 1L;
+
+ /** The db and table names. */
+ private final String databaseName;
+ private final String tableName;
+
+ /** meta information of the table to be read from */
+ private HCatTableInfo tableInfo;
+
+ /** The partition filter */
+ private String filter;
+
+ /** The list of partitions matching the filter. */
+ transient private List<PartInfo> partitions;
+
+ /** implementation specific job properties */
+ private Properties properties;
+
+ /**
+ * Initializes a new InputJobInfo
+ * for reading data from a table.
+ * @param databaseName the db name
+ * @param tableName the table name
+ * @param filter the partition filter
+ * @param properties implementation specific job properties
+ */
+ public static InputJobInfo create(String databaseName,
+ String tableName,
+ String filter,
+ Properties properties) {
+ return new InputJobInfo(databaseName, tableName, filter, properties);
+ }
+
+ /**
+ * Initializes a new InputJobInfo
+ * for reading data from a table.
+ * @param databaseName the db name
+ * @param tableName the table name
+ * @param filter the partition filter
+ */
+ @Deprecated
+ public static InputJobInfo create(String databaseName,
+ String tableName,
+ String filter) {
+ return create(databaseName, tableName, filter, null);
+ }
+
+
+ private InputJobInfo(String databaseName,
+ String tableName,
+ String filter,
+ Properties properties) {
+ this.databaseName = (databaseName == null) ?
+ MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+ this.tableName = tableName;
+ this.filter = filter;
+ this.properties = properties == null ? new Properties() : properties;
+ }
+
+ /**
+ * Gets the value of databaseName
+ * @return the databaseName
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * Gets the value of tableName
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Gets the table's meta information
+ * @return the HCatTableInfo
+ */
+ public HCatTableInfo getTableInfo() {
+ return tableInfo;
+ }
+
+ /**
+ * set the tablInfo instance
+ * this should be the same instance
+ * determined by this object's DatabaseName and TableName
+ * @param tableInfo
+ */
+ void setTableInfo(HCatTableInfo tableInfo) {
+ this.tableInfo = tableInfo;
+ }
+
+ /**
+ * Gets the value of partition filter
+ * @return the filter string
+ */
+ public String getFilter() {
+ return filter;
+ }
+
+ /**
+ * @return partition info
+ */
+ public List<PartInfo> getPartitions() {
+ return partitions;
+ }
+
+ /**
+ * @return partition info list
+ */
+ void setPartitions(List<PartInfo> partitions) {
+ this.partitions = partitions;
+ }
+
+ /**
+ * Set/Get Property information to be passed down to *StorageHandler implementation
+ * put implementation specific storage handler configurations here
+ * @return the implementation specific job properties
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Serialize this object, compressing the partitions which can exceed the
+ * allowed jobConf size.
+ * @see <a href="https://issues.apache.org/jira/browse/HCATALOG-453">HCATALOG-453</a>
+ */
+ private void writeObject(ObjectOutputStream oos)
+ throws IOException {
+ oos.defaultWriteObject();
+ Deflater def = new Deflater(Deflater.BEST_COMPRESSION);
+ ObjectOutputStream partInfoWriter =
+ new ObjectOutputStream(new DeflaterOutputStream(oos, def));
+ partInfoWriter.writeObject(partitions);
+ partInfoWriter.close();
+ }
+
+ /**
+ * Deserialize this object, decompressing the partitions which can exceed the
+ * allowed jobConf size.
+ * @see <a href="https://issues.apache.org/jira/browse/HCATALOG-453">HCATALOG-453</a>
+ */
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream ois)
+ throws IOException, ClassNotFoundException {
+ ois.defaultReadObject();
+ ObjectInputStream partInfoReader =
+ new ObjectInputStream(new InflaterInputStream(ois));
+ partitions = (List<PartInfo>)partInfoReader.readObject();
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InternalUtil.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/InternalUtil.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+class InternalUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(InternalUtil.class);
+
+ static StorerInfo extractStorerInfo(StorageDescriptor sd, Map<String, String> properties) throws IOException {
+ Properties hcatProperties = new Properties();
+ for (String key : properties.keySet()) {
+ hcatProperties.put(key, properties.get(key));
+ }
+
+ // also populate with StorageDescriptor->SerDe.Parameters
+ for (Map.Entry<String, String> param :
+ sd.getSerdeInfo().getParameters().entrySet()) {
+ hcatProperties.put(param.getKey(), param.getValue());
+ }
+
+
+ return new StorerInfo(
+ sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(),
+ properties.get(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE),
+ hcatProperties);
+ }
+
+ static StructObjectInspector createStructObjectInspector(HCatSchema outputSchema) throws IOException {
+
+ if (outputSchema == null) {
+ throw new IOException("Invalid output schema specified");
+ }
+
+ List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+ List<String> fieldNames = new ArrayList<String>();
+
+ for (HCatFieldSchema hcatFieldSchema : outputSchema.getFields()) {
+ TypeInfo type = TypeInfoUtils.getTypeInfoFromTypeString(hcatFieldSchema.getTypeString());
+
+ fieldNames.add(hcatFieldSchema.getName());
+ fieldInspectors.add(getObjectInspector(type));
+ }
+
+ StructObjectInspector structInspector = ObjectInspectorFactory.
+ getStandardStructObjectInspector(fieldNames, fieldInspectors);
+ return structInspector;
+ }
+
+ private static ObjectInspector getObjectInspector(TypeInfo type) throws IOException {
+
+ switch (type.getCategory()) {
+
+ case PRIMITIVE:
+ PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
+ return PrimitiveObjectInspectorFactory.
+ getPrimitiveJavaObjectInspector(primitiveType.getPrimitiveCategory());
+
+ case MAP:
+ MapTypeInfo mapType = (MapTypeInfo) type;
+ MapObjectInspector mapInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
+ getObjectInspector(mapType.getMapKeyTypeInfo()), getObjectInspector(mapType.getMapValueTypeInfo()));
+ return mapInspector;
+
+ case LIST:
+ ListTypeInfo listType = (ListTypeInfo) type;
+ ListObjectInspector listInspector = ObjectInspectorFactory.getStandardListObjectInspector(
+ getObjectInspector(listType.getListElementTypeInfo()));
+ return listInspector;
+
+ case STRUCT:
+ StructTypeInfo structType = (StructTypeInfo) type;
+ List<TypeInfo> fieldTypes = structType.getAllStructFieldTypeInfos();
+
+ List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>();
+ for (TypeInfo fieldType : fieldTypes) {
+ fieldInspectors.add(getObjectInspector(fieldType));
+ }
+
+ StructObjectInspector structInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ structType.getAllStructFieldNames(), fieldInspectors);
+ return structInspector;
+
+ default:
+ throw new IOException("Unknown field schema type");
+ }
+ }
+
+ //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
+ // if the default was decided by the serde
+ static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo)
+ throws SerDeException {
+ serDe.initialize(conf, getSerdeProperties(jobInfo.getTableInfo(), jobInfo.getOutputSchema()));
+ }
+
+ static void initializeDeserializer(Deserializer deserializer, Configuration conf,
+ HCatTableInfo info, HCatSchema schema) throws SerDeException {
+ Properties props = getSerdeProperties(info, schema);
+ LOG.info("Initializing " + deserializer.getClass().getName() + " with properties " + props);
+ deserializer.initialize(conf, props);
+ }
+
+ private static Properties getSerdeProperties(HCatTableInfo info, HCatSchema s)
+ throws SerDeException {
+ Properties props = new Properties();
+ List<FieldSchema> fields = HCatUtil.getFieldSchemaList(s.getFields());
+ props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS,
+ MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+ props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES,
+ MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+
+ // setting these props to match LazySimpleSerde
+ props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "\\N");
+ props.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, "1");
+
+ //add props from params set in table schema
+ props.putAll(info.getStorerInfo().getProperties());
+
+ return props;
+ }
+
+ static Reporter createReporter(TaskAttemptContext context) {
+ return new ProgressReporter(context);
+ }
+
+ /**
+ * Casts an InputSplit into a HCatSplit, providing a useful error message if the cast fails.
+ * @param split the InputSplit
+ * @return the HCatSplit
+ * @throws IOException
+ */
+ public static HCatSplit castToHCatSplit(InputSplit split) throws IOException {
+ if (split instanceof HCatSplit) {
+ return (HCatSplit) split;
+ } else {
+ throw new IOException("Split must be " + HCatSplit.class.getName()
+ + " but found " + split.getClass().getName());
+ }
+ }
+
+
+ static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn)
+ throws IOException {
+ List<String> values = ptn.getValues();
+ if (values.size() != table.getPartitionKeys().size()) {
+ throw new IOException(
+ "Partition values in partition inconsistent with table definition, table "
+ + table.getTableName() + " has "
+ + table.getPartitionKeys().size()
+ + " partition keys, partition has " + values.size()
+ + "partition values");
+ }
+
+ Map<String, String> ptnKeyValues = new HashMap<String, String>();
+
+ int i = 0;
+ for (FieldSchema schema : table.getPartitionKeys()) {
+ // CONCERN : the way this mapping goes, the order *needs* to be
+ // preserved for table.getPartitionKeys() and ptn.getValues()
+ ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+ i++;
+ }
+
+ return ptnKeyValues;
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,618 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The MultiOutputFormat class simplifies writing output data to multiple
+ * outputs.
+ * <p>
+ * Multiple output formats can be defined each with its own
+ * <code>OutputFormat</code> class, own key class and own value class. Any
+ * configuration on these output format classes can be done without interfering
+ * with other output format's configuration.
+ * <p>
+ * Usage pattern for job submission:
+ *
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ *
+ * job.setMapperClass(WordCountMap.class);
+ * job.setReducerClass(WordCountReduce.class);
+ * job.setInputFormatClass(TextInputFormat.class);
+ * job.setOutputFormatClass(MultiOutputFormat.class);
+ * // Need not define OutputKeyClass and OutputValueClass. They default to
+ * // Writable.class
+ * job.setMapOutputKeyClass(Text.class);
+ * job.setMapOutputValueClass(IntWritable.class);
+ *
+ *
+ * // Create a JobConfigurer that will configure the job with the multiple
+ * // output format information.
+ * JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+ *
+ * // Defines additional single text based output 'text' for the job.
+ * // Any configuration for the defined OutputFormat should be done with
+ * // the Job obtained with configurer.getJob() method.
+ * configurer.addOutputFormat("text", TextOutputFormat.class,
+ * IntWritable.class, Text.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("text"), textOutDir);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * configurer.addOutputFormat("sequence", SequenceFileOutputFormat.class,
+ * Text.class, IntWritable.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("sequence"), seqOutDir);
+ * ...
+ * // configure method to be called on the JobConfigurer once all the
+ * // output formats have been defined and configured.
+ * configurer.configure();
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ *
+ * <pre>
+ * public class WordCountReduce extends
+ * Reducer<Text, IntWritable, Writable, Writable> {
+ *
+ * private IntWritable count = new IntWritable();
+ *
+ * public void reduce(Text word, Iterator<IntWritable> values,
+ * Context context)
+ * throws IOException {
+ * int sum = 0;
+ * for (IntWritable val : values) {
+ * sum += val.get();
+ * }
+ * count.set(sum);
+ * MultiOutputFormat.write("text", count, word, context);
+ * MultiOutputFormat.write("sequence", word, count, context);
+ * }
+ *
+ * }
+ *
+ * </pre>
+ *
+ * Map only jobs:
+ * <p>
+ * MultiOutputFormat.write("output", key, value, context); can be called similar
+ * to a reducer in map only jobs.
+ *
+ */
+public class MultiOutputFormat extends OutputFormat<Writable, Writable> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MultiOutputFormat.class.getName());
+ private static final String MO_ALIASES = "mapreduce.multiout.aliases";
+ private static final String MO_ALIAS = "mapreduce.multiout.alias";
+ private static final String CONF_KEY_DELIM = "%%";
+ private static final String CONF_VALUE_DELIM = ";;";
+ private static final String COMMA_DELIM = ",";
+ private static final List<String> configsToOverride = new ArrayList<String>();
+ private static final Map<String, String> configsToMerge = new HashMap<String, String>();
+
+ static {
+ configsToOverride.add("mapred.output.dir");
+ configsToOverride.add(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_SYMLINK));
+ configsToMerge.put(JobContext.JOB_NAMENODES, COMMA_DELIM);
+ configsToMerge.put("tmpfiles", COMMA_DELIM);
+ configsToMerge.put("tmpjars", COMMA_DELIM);
+ configsToMerge.put("tmparchives", COMMA_DELIM);
+ configsToMerge.put(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_ARCHIVES), COMMA_DELIM);
+ configsToMerge.put(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_FILES), COMMA_DELIM);
+ String fileSep;
+ if (HCatUtil.isHadoop23()) {
+ fileSep = ",";
+ } else {
+ fileSep = System.getProperty("path.separator");
+ }
+ configsToMerge.put("mapred.job.classpath.archives", fileSep);
+ configsToMerge.put("mapred.job.classpath.files", fileSep);
+ }
+
+ /**
+ * Get a JobConfigurer instance that will support configuration of the job
+ * for multiple output formats.
+ *
+ * @param job the mapreduce job to be submitted
+ * @return JobConfigurer
+ */
+ public static JobConfigurer createConfigurer(Job job) {
+ return JobConfigurer.create(job);
+ }
+
+ /**
+ * Get the JobContext with the related OutputFormat configuration populated given the alias
+ * and the actual JobContext
+ * @param alias the name given to the OutputFormat configuration
+ * @param context the JobContext
+ * @return a copy of the JobContext with the alias configuration populated
+ */
+ public static JobContext getJobContext(String alias, JobContext context) {
+ String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
+ JobContext aliasContext = HCatHadoopShims.Instance.get().createJobContext(context.getConfiguration(), context.getJobID());
+ addToConfig(aliasConf, aliasContext.getConfiguration());
+ return aliasContext;
+ }
+
+ /**
+ * Get the TaskAttemptContext with the related OutputFormat configuration populated given the alias
+ * and the actual TaskAttemptContext
+ * @param alias the name given to the OutputFormat configuration
+ * @param context the Mapper or Reducer Context
+ * @return a copy of the TaskAttemptContext with the alias configuration populated
+ */
+ public static TaskAttemptContext getTaskAttemptContext(String alias, TaskAttemptContext context) {
+ String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
+ TaskAttemptContext aliasContext = HCatHadoopShims.Instance.get().createTaskAttemptContext(
+ context.getConfiguration(), context.getTaskAttemptID());
+ addToConfig(aliasConf, aliasContext.getConfiguration());
+ return aliasContext;
+ }
+
+ /**
+ * Write the output key and value using the OutputFormat defined by the
+ * alias.
+ *
+ * @param alias the name given to the OutputFormat configuration
+ * @param key the output key to be written
+ * @param value the output value to be written
+ * @param context the Mapper or Reducer Context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static <K, V> void write(String alias, K key, V value, TaskInputOutputContext context)
+ throws IOException, InterruptedException {
+ KeyValue<K, V> keyval = new KeyValue<K, V>(key, value);
+ context.write(new Text(alias), keyval);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ for (String alias : getOutputFormatAliases(context)) {
+ LOGGER.debug("Calling checkOutputSpecs for alias: " + alias);
+ JobContext aliasContext = getJobContext(alias, context);
+ OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
+ outputFormat.checkOutputSpecs(aliasContext);
+ // Copy credentials and any new config added back to JobContext
+ context.getCredentials().addAll(aliasContext.getCredentials());
+ setAliasConf(alias, context, aliasContext);
+ }
+ }
+
+ @Override
+ public RecordWriter<Writable, Writable> getRecordWriter(TaskAttemptContext context)
+ throws IOException,
+ InterruptedException {
+ return new MultiRecordWriter(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new MultiOutputCommitter(context);
+ }
+
+ private static OutputFormat<?, ?> getOutputFormatInstance(JobContext context) {
+ OutputFormat<?, ?> outputFormat;
+ try {
+ outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(),
+ context.getConfiguration());
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ return outputFormat;
+ }
+
+ private static String[] getOutputFormatAliases(JobContext context) {
+ return context.getConfiguration().getStrings(MO_ALIASES);
+ }
+
+ /**
+ * Compare the aliasContext with userJob and add the differing configuration
+ * as mapreduce.multiout.alias.<aliasname>.conf to the userJob.
+ * <p>
+ * Merge config like tmpjars, tmpfile, tmparchives,
+ * mapreduce.job.hdfs-servers that are directly handled by JobClient and add
+ * them to userJob.
+ * <p>
+ * Add mapred.output.dir config to userJob.
+ *
+ * @param alias alias name associated with a OutputFormat
+ * @param userJob reference to Job that the user is going to submit
+ * @param aliasContext JobContext populated with OutputFormat related
+ * configuration.
+ */
+ private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) {
+ Configuration userConf = userJob.getConfiguration();
+ StringBuilder builder = new StringBuilder();
+ for (Entry<String, String> conf : aliasContext.getConfiguration()) {
+ String key = conf.getKey();
+ String value = conf.getValue();
+ String jobValue = userConf.getRaw(key);
+ if (jobValue == null || !jobValue.equals(value)) {
+ if (configsToMerge.containsKey(key)) {
+ String mergedValue = getMergedConfValue(jobValue, value, configsToMerge.get(key));
+ userConf.set(key, mergedValue);
+ } else {
+ if (configsToOverride.contains(key)) {
+ userConf.set(key, value);
+ }
+ builder.append(key).append(CONF_KEY_DELIM).append(value)
+ .append(CONF_VALUE_DELIM);
+ }
+ }
+ }
+ if (builder.length() > CONF_VALUE_DELIM.length()) {
+ builder.delete(builder.length() - CONF_VALUE_DELIM.length(), builder.length());
+ userConf.set(getAliasConfName(alias), builder.toString());
+ }
+ }
+
+ private static String getMergedConfValue(String originalValues, String newValues, String separator) {
+ if (originalValues == null) {
+ return newValues;
+ }
+ Set<String> mergedValues = new LinkedHashSet<String>();
+ mergedValues.addAll(Arrays.asList(StringUtils.split(originalValues, separator)));
+ mergedValues.addAll(Arrays.asList(StringUtils.split(newValues, separator)));
+ StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2);
+ for (String value : mergedValues) {
+ builder.append(value).append(separator);
+ }
+ return builder.substring(0, builder.length() - separator.length());
+ }
+
+ private static String getAliasConfName(String alias) {
+ return MO_ALIAS + "." + alias + ".conf";
+ }
+
+ private static void addToConfig(String aliasConf, Configuration conf) {
+ String[] config = aliasConf.split(CONF_KEY_DELIM + "|" + CONF_VALUE_DELIM);
+ for (int i = 0; i < config.length; i += 2) {
+ conf.set(config[i], config[i + 1]);
+ }
+ }
+
+ /**
+ * Class that supports configuration of the job for multiple output formats.
+ */
+ public static class JobConfigurer {
+
+ private final Job job;
+ private Map<String, Job> outputConfigs = new LinkedHashMap<String, Job>();
+
+ private JobConfigurer(Job job) {
+ this.job = job;
+ }
+
+ private static JobConfigurer create(Job job) {
+ JobConfigurer configurer = new JobConfigurer(job);
+ return configurer;
+ }
+
+ /**
+ * Add a OutputFormat configuration to the Job with a alias name.
+ *
+ * @param alias the name to be given to the OutputFormat configuration
+ * @param outputFormatClass OutputFormat class
+ * @param keyClass the key class for the output data
+ * @param valueClass the value class for the output data
+ * @throws IOException
+ */
+ public void addOutputFormat(String alias,
+ Class<? extends OutputFormat> outputFormatClass,
+ Class<?> keyClass, Class<?> valueClass) throws IOException {
+ Job copy = new Job(this.job.getConfiguration());
+ outputConfigs.put(alias, copy);
+ copy.setOutputFormatClass(outputFormatClass);
+ copy.setOutputKeyClass(keyClass);
+ copy.setOutputValueClass(valueClass);
+ }
+
+ /**
+ * Get the Job configuration for a OutputFormat defined by the alias
+ * name. The job returned by this method should be passed to the
+ * OutputFormat for any configuration instead of the Job that will be
+ * submitted to the JobClient.
+ *
+ * @param alias the name used for the OutputFormat during
+ * addOutputFormat
+ * @return Job
+ */
+ public Job getJob(String alias) {
+ Job copy = outputConfigs.get(alias);
+ if (copy == null) {
+ throw new IllegalArgumentException("OutputFormat with alias " + alias
+ + " has not beed added");
+ }
+ return copy;
+ }
+
+ /**
+ * Configure the job with the multiple output formats added. This method
+ * should be called after all the output formats have been added and
+ * configured and before the job submission.
+ */
+ public void configure() {
+ StringBuilder aliases = new StringBuilder();
+ Configuration jobConf = job.getConfiguration();
+ for (Entry<String, Job> entry : outputConfigs.entrySet()) {
+ // Copy credentials
+ job.getCredentials().addAll(entry.getValue().getCredentials());
+ String alias = entry.getKey();
+ aliases.append(alias).append(COMMA_DELIM);
+ // Store the differing configuration for each alias in the job
+ // as a setting.
+ setAliasConf(alias, job, entry.getValue());
+ }
+ aliases.delete(aliases.length() - COMMA_DELIM.length(), aliases.length());
+ jobConf.set(MO_ALIASES, aliases.toString());
+ }
+
+ }
+
+ private static class KeyValue<K, V> implements Writable {
+ private final K key;
+ private final V value;
+
+ public KeyValue(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // Ignore. Not required as this will be never
+ // serialized/deserialized.
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // Ignore. Not required as this will be never
+ // serialized/deserialized.
+ }
+ }
+
+ private static class MultiRecordWriter extends RecordWriter<Writable, Writable> {
+
+ private final Map<String, BaseRecordWriterContainer> baseRecordWriters;
+
+ public MultiRecordWriter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ baseRecordWriters = new LinkedHashMap<String, BaseRecordWriterContainer>();
+ String[] aliases = getOutputFormatAliases(context);
+ for (String alias : aliases) {
+ LOGGER.info("Creating record writer for alias: " + alias);
+ TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
+ Configuration aliasConf = aliasContext.getConfiguration();
+ // Create output directory if not already created.
+ String outDir = aliasConf.get("mapred.output.dir");
+ if (outDir != null) {
+ Path outputDir = new Path(outDir);
+ FileSystem fs = outputDir.getFileSystem(aliasConf);
+ if (!fs.exists(outputDir)) {
+ fs.mkdirs(outputDir);
+ }
+ }
+ OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
+ baseRecordWriters.put(alias,
+ new BaseRecordWriterContainer(outputFormat.getRecordWriter(aliasContext),
+ aliasContext));
+ }
+ }
+
+ @Override
+ public void write(Writable key, Writable value) throws IOException, InterruptedException {
+ Text _key = (Text) key;
+ KeyValue _value = (KeyValue) value;
+ String alias = new String(_key.getBytes(), 0, _key.getLength());
+ BaseRecordWriterContainer baseRWContainer = baseRecordWriters.get(alias);
+ if (baseRWContainer == null) {
+ throw new IllegalArgumentException("OutputFormat with alias " + alias
+ + " has not been added");
+ }
+ baseRWContainer.getRecordWriter().write(_value.getKey(), _value.getValue());
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ for (Entry<String, BaseRecordWriterContainer> entry : baseRecordWriters.entrySet()) {
+ BaseRecordWriterContainer baseRWContainer = entry.getValue();
+ LOGGER.info("Closing record writer for alias: " + entry.getKey());
+ baseRWContainer.getRecordWriter().close(baseRWContainer.getContext());
+ }
+ }
+
+ }
+
+ private static class BaseRecordWriterContainer {
+
+ private final RecordWriter recordWriter;
+ private final TaskAttemptContext context;
+
+ public BaseRecordWriterContainer(RecordWriter recordWriter, TaskAttemptContext context) {
+ this.recordWriter = recordWriter;
+ this.context = context;
+ }
+
+ public RecordWriter getRecordWriter() {
+ return recordWriter;
+ }
+
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ public class MultiOutputCommitter extends OutputCommitter {
+
+ private final Map<String, BaseOutputCommitterContainer> outputCommitters;
+
+ public MultiOutputCommitter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ outputCommitters = new LinkedHashMap<String, MultiOutputFormat.BaseOutputCommitterContainer>();
+ String[] aliases = getOutputFormatAliases(context);
+ for (String alias : aliases) {
+ LOGGER.info("Creating output committer for alias: " + alias);
+ TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
+ OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext)
+ .getOutputCommitter(aliasContext);
+ outputCommitters.put(alias,
+ new BaseOutputCommitterContainer(baseCommitter, aliasContext));
+ }
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling setupJob for alias: " + alias);
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().setupJob(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling setupTask for alias: " + alias);
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().setupTask(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+ boolean needTaskCommit = false;
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ needTaskCommit = needTaskCommit
+ || outputContainer.getBaseCommitter().needsTaskCommit(
+ outputContainer.getContext());
+ }
+ return needTaskCommit;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ OutputCommitter baseCommitter = outputContainer.getBaseCommitter();
+ TaskAttemptContext committerContext = outputContainer.getContext();
+ if (baseCommitter.needsTaskCommit(committerContext)) {
+ LOGGER.info("Calling commitTask for alias: " + alias);
+ baseCommitter.commitTask(committerContext);
+ }
+ }
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling abortTask for alias: " + alias);
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().abortTask(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling commitJob for alias: " + alias);
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().commitJob(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, State state) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling abortJob for alias: " + alias);
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state);
+ }
+ }
+ }
+
+ private static class BaseOutputCommitterContainer {
+
+ private final OutputCommitter outputCommitter;
+ private final TaskAttemptContext context;
+
+ public BaseOutputCommitterContainer(OutputCommitter outputCommitter,
+ TaskAttemptContext context) {
+ this.outputCommitter = outputCommitter;
+ this.context = context;
+ }
+
+ public OutputCommitter getBaseCommitter() {
+ return outputCommitter;
+ }
+
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+
+/**
+ * This class will contain an implementation of an OutputCommitter.
+ * See {@link OutputFormatContainer} for more information about containers.
+ */
+abstract class OutputCommitterContainer extends OutputCommitter {
+ private final org.apache.hadoop.mapred.OutputCommitter committer;
+
+ /**
+ * @param context current JobContext
+ * @param committer OutputCommitter that this instance will contain
+ */
+ public OutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter committer) {
+ this.committer = committer;
+ }
+
+ /**
+ * @return underlying OutputCommitter
+ */
+ public OutputCommitter getBaseOutputCommitter() {
+ return committer;
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ * This container class is used to wrap OutputFormat implementations and augment them with
+ * behavior necessary to work with HCatalog (ie metastore updates, hcatalog delegation tokens, etc).
+ * Containers are also used to provide storage specific implementations of some HCatalog features (ie dynamic partitioning).
+ * Hence users wishing to create storage specific implementations of HCatalog features should implement this class and override
+ * HCatStorageHandler.getOutputFormatContainer(OutputFormat outputFormat) to return the implementation.
+ * By default DefaultOutputFormatContainer is used, which only implements the bare minimum features HCatalog features
+ * such as partitioning isn't supported.
+ */
+abstract class OutputFormatContainer extends OutputFormat<WritableComparable<?>, HCatRecord> {
+ private org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> of;
+
+ /**
+ * @param of OutputFormat this instance will contain
+ */
+ public OutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> of) {
+ this.of = of;
+ }
+
+ /**
+ * @return underlying OutputFormat
+ */
+ public org.apache.hadoop.mapred.OutputFormat getBaseOutputFormat() {
+ return of;
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The class used to serialize and store the output related information */
+public class OutputJobInfo implements Serializable {
+
+ /** The db and table names. */
+ private final String databaseName;
+ private final String tableName;
+
+ /** The serialization version. */
+ private static final long serialVersionUID = 1L;
+
+ /** The table info provided by user. */
+ private HCatTableInfo tableInfo;
+
+ /** The output schema. This is given to us by user. This wont contain any
+ * partition columns ,even if user has specified them.
+ * */
+ private HCatSchema outputSchema;
+
+ /** The location of the partition being written */
+ private String location;
+
+ /** The partition values to publish to, if used for output*/
+ private Map<String, String> partitionValues;
+
+ private List<Integer> posOfPartCols;
+ private List<Integer> posOfDynPartCols;
+
+ private Properties properties;
+
+ private int maxDynamicPartitions;
+
+ /** List of keys for which values were not specified at write setup time, to be infered at write time */
+ private List<String> dynamicPartitioningKeys;
+
+ private boolean harRequested;
+
+ /**
+ * Initializes a new OutputJobInfo instance
+ * for writing data from a table.
+ * @param databaseName the db name
+ * @param tableName the table name
+ * @param partitionValues The partition values to publish to, can be null or empty Map to
+ * work with hadoop security, the kerberos principal name of the server - else null
+ * The principal name should be of the form:
+ * <servicename>/_HOST@<realm> like "hcat/_HOST@myrealm.com"
+ * The special string _HOST will be replaced automatically with the correct host name
+ * indicate write to a unpartitioned table. For partitioned tables, this map should
+ * contain keys for all partition columns with corresponding values.
+ */
+ public static OutputJobInfo create(String databaseName,
+ String tableName,
+ Map<String, String> partitionValues) {
+ return new OutputJobInfo(databaseName,
+ tableName,
+ partitionValues);
+ }
+
+ private OutputJobInfo(String databaseName,
+ String tableName,
+ Map<String, String> partitionValues) {
+ this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+ this.tableName = tableName;
+ this.partitionValues = partitionValues;
+ this.properties = new Properties();
+ }
+
+ /**
+ * @return the posOfPartCols
+ */
+ protected List<Integer> getPosOfPartCols() {
+ return posOfPartCols;
+ }
+
+ /**
+ * @return the posOfDynPartCols
+ */
+ protected List<Integer> getPosOfDynPartCols() {
+ return posOfDynPartCols;
+ }
+
+ /**
+ * @param posOfPartCols the posOfPartCols to set
+ */
+ protected void setPosOfPartCols(List<Integer> posOfPartCols) {
+ // sorting the list in the descending order so that deletes happen back-to-front
+ Collections.sort(posOfPartCols, new Comparator<Integer>() {
+ @Override
+ public int compare(Integer earlier, Integer later) {
+ return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
+ }
+ });
+ this.posOfPartCols = posOfPartCols;
+ }
+
+ /**
+ * @param posOfDynPartCols the posOfDynPartCols to set
+ */
+ protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) {
+ // Important - no sorting here! We retain order, it's used to match with values at runtime
+ this.posOfDynPartCols = posOfDynPartCols;
+ }
+
+ /**
+ * @return the tableInfo
+ */
+ public HCatTableInfo getTableInfo() {
+ return tableInfo;
+ }
+
+ /**
+ * @return the outputSchema
+ */
+ public HCatSchema getOutputSchema() {
+ return outputSchema;
+ }
+
+ /**
+ * @param schema the outputSchema to set
+ */
+ public void setOutputSchema(HCatSchema schema) {
+ this.outputSchema = schema;
+ }
+
+ /**
+ * @return the location
+ */
+ public String getLocation() {
+ return location;
+ }
+
+ /**
+ * @param location location to write to
+ */
+ public void setLocation(String location) {
+ this.location = location;
+ }
+
+ /**
+ * Sets the value of partitionValues
+ * @param partitionValues the partition values to set
+ */
+ void setPartitionValues(Map<String, String> partitionValues) {
+ this.partitionValues = partitionValues;
+ }
+
+ /**
+ * Gets the value of partitionValues
+ * @return the partitionValues
+ */
+ public Map<String, String> getPartitionValues() {
+ return partitionValues;
+ }
+
+ /**
+ * set the tablInfo instance
+ * this should be the same instance
+ * determined by this object's DatabaseName and TableName
+ * @param tableInfo
+ */
+ void setTableInfo(HCatTableInfo tableInfo) {
+ this.tableInfo = tableInfo;
+ }
+
+ /**
+ * @return database name of table to write to
+ */
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ /**
+ * @return name of table to write to
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Set/Get Property information to be passed down to *StorageHandler implementation
+ * put implementation specific storage handler configurations here
+ * @return the implementation specific job properties
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
+ /**
+ * Set maximum number of allowable dynamic partitions
+ * @param maxDynamicPartitions
+ */
+ public void setMaximumDynamicPartitions(int maxDynamicPartitions) {
+ this.maxDynamicPartitions = maxDynamicPartitions;
+ }
+
+ /**
+ * Returns maximum number of allowable dynamic partitions
+ * @return maximum number of allowable dynamic partitions
+ */
+ public int getMaxDynamicPartitions() {
+ return this.maxDynamicPartitions;
+ }
+
+ /**
+ * Sets whether or not hadoop archiving has been requested for this job
+ * @param harRequested
+ */
+ public void setHarRequested(boolean harRequested) {
+ this.harRequested = harRequested;
+ }
+
+ /**
+ * Returns whether or not hadoop archiving has been requested for this job
+ * @return whether or not hadoop archiving has been requested for this job
+ */
+ public boolean getHarRequested() {
+ return this.harRequested;
+ }
+
+ /**
+ * Returns whether or not Dynamic Partitioning is used
+ * @return whether or not dynamic partitioning is currently enabled and used
+ */
+ public boolean isDynamicPartitioningUsed() {
+ return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty()));
+ }
+
+ /**
+ * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys
+ * @param dynamicPartitioningKeys
+ */
+ public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) {
+ this.dynamicPartitioningKeys = dynamicPartitioningKeys;
+ }
+
+ public List<String> getDynamicPartitioningKeys() {
+ return this.dynamicPartitioningKeys;
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/PartInfo.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The Class used to serialize the partition information read from the metadata server that maps to a partition. */
+public class PartInfo implements Serializable {
+
+ /** The serialization version */
+ private static final long serialVersionUID = 1L;
+
+ /** The partition schema. */
+ private final HCatSchema partitionSchema;
+
+ /** The information about which input storage handler to use */
+ private final String storageHandlerClassName;
+ private final String inputFormatClassName;
+ private final String outputFormatClassName;
+ private final String serdeClassName;
+
+ /** HCat-specific properties set at the partition */
+ private final Properties hcatProperties;
+
+ /** The data location. */
+ private final String location;
+
+ /** The map of partition key names and their values. */
+ private Map<String, String> partitionValues;
+
+ /** Job properties associated with this parition */
+ Map<String, String> jobProperties;
+
+ /** the table info associated with this partition */
+ HCatTableInfo tableInfo;
+
+ /**
+ * Instantiates a new hcat partition info.
+ * @param partitionSchema the partition schema
+ * @param storageHandler the storage handler
+ * @param location the location
+ * @param hcatProperties hcat-specific properties at the partition
+ * @param jobProperties the job properties
+ * @param tableInfo the table information
+ */
+ public PartInfo(HCatSchema partitionSchema, HCatStorageHandler storageHandler,
+ String location, Properties hcatProperties,
+ Map<String, String> jobProperties, HCatTableInfo tableInfo) {
+ this.partitionSchema = partitionSchema;
+ this.location = location;
+ this.hcatProperties = hcatProperties;
+ this.jobProperties = jobProperties;
+ this.tableInfo = tableInfo;
+
+ this.storageHandlerClassName = storageHandler.getClass().getName();
+ this.inputFormatClassName = storageHandler.getInputFormatClass().getName();
+ this.serdeClassName = storageHandler.getSerDeClass().getName();
+ this.outputFormatClassName = storageHandler.getOutputFormatClass().getName();
+ }
+
+ /**
+ * Gets the value of partitionSchema.
+ * @return the partitionSchema
+ */
+ public HCatSchema getPartitionSchema() {
+ return partitionSchema;
+ }
+
+ /**
+ * @return the storage handler class name
+ */
+ public String getStorageHandlerClassName() {
+ return storageHandlerClassName;
+ }
+
+ /**
+ * @return the inputFormatClassName
+ */
+ public String getInputFormatClassName() {
+ return inputFormatClassName;
+ }
+
+ /**
+ * @return the outputFormatClassName
+ */
+ public String getOutputFormatClassName() {
+ return outputFormatClassName;
+ }
+
+ /**
+ * @return the serdeClassName
+ */
+ public String getSerdeClassName() {
+ return serdeClassName;
+ }
+
+ /**
+ * Gets the input storage handler properties.
+ * @return HCat-specific properties set at the partition
+ */
+ public Properties getInputStorageHandlerProperties() {
+ return hcatProperties;
+ }
+
+ /**
+ * Gets the value of location.
+ * @return the location
+ */
+ public String getLocation() {
+ return location;
+ }
+
+ /**
+ * Sets the partition values.
+ * @param partitionValues the new partition values
+ */
+ public void setPartitionValues(Map<String, String> partitionValues) {
+ this.partitionValues = partitionValues;
+ }
+
+ /**
+ * Gets the partition values.
+ * @return the partition values
+ */
+ public Map<String, String> getPartitionValues() {
+ return partitionValues;
+ }
+
+ /**
+ * Gets the job properties.
+ * @return a map of the job properties
+ */
+ public Map<String, String> getJobProperties() {
+ return jobProperties;
+ }
+
+ /**
+ * Gets the HCatalog table information.
+ * @return the table information
+ */
+ public HCatTableInfo getTableInfo() {
+ return tableInfo;
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ProgressReporter.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ProgressReporter.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/ProgressReporter.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+class ProgressReporter extends StatusReporter implements Reporter {
+
+ private TaskInputOutputContext context = null;
+ private TaskAttemptContext taskAttemptContext = null;
+
+ public ProgressReporter(TaskAttemptContext context) {
+ if (context instanceof TaskInputOutputContext) {
+ this.context = (TaskInputOutputContext) context;
+ } else {
+ taskAttemptContext = context;
+ }
+ }
+
+ @Override
+ public void setStatus(String status) {
+ if (context != null) {
+ context.setStatus(status);
+ }
+ }
+
+ @Override
+ public Counters.Counter getCounter(Enum<?> name) {
+ return (context != null) ? (Counters.Counter) context.getCounter(name) : null;
+ }
+
+ @Override
+ public Counters.Counter getCounter(String group, String name) {
+ return (context != null) ? (Counters.Counter) context.getCounter(group, name) : null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+ if (context != null) {
+ context.getCounter(key).increment(amount);
+ }
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+ if (context != null) {
+ context.getCounter(group, counter).increment(amount);
+ }
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ public float getProgress() {
+ /* Required to build against 0.23 Reporter and StatusReporter. */
+ /* TODO: determine the progress. */
+ return 0.0f;
+ }
+
+ @Override
+ public void progress() {
+ if (context != null) {
+ context.progress();
+ } else {
+ taskAttemptContext.progress();
+ }
+ }
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ * This class will contain an implementation of an RecordWriter.
+ * See {@link OutputFormatContainer} for more information about containers.
+ */
+abstract class RecordWriterContainer extends RecordWriter<WritableComparable<?>, HCatRecord> {
+
+ private final org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter;
+
+ /**
+ * @param context current JobContext
+ * @param baseRecordWriter RecordWriter that this instance will contain
+ */
+ public RecordWriterContainer(TaskAttemptContext context,
+ org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) {
+ this.baseRecordWriter = baseRecordWriter;
+ }
+
+ /**
+ * @return underlying RecordWriter
+ */
+ public org.apache.hadoop.mapred.RecordWriter getBaseRecordWriter() {
+ return baseRecordWriter;
+ }
+
+}
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/Security.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/Security.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/Security.java (added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hcatalog/mapreduce/Security.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class Security {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HCatOutputFormat.class);
+
+ // making sure this is not initialized unless needed
+ private static final class LazyHolder {
+ public static final Security INSTANCE = new Security();
+ }
+
+ public static Security getInstance() {
+ return LazyHolder.INSTANCE;
+ }
+
+ boolean isSecurityEnabled() {
+ try {
+ Method m = UserGroupInformation.class.getMethod("isSecurityEnabled");
+ return (Boolean) m.invoke(null, (Object[]) null);
+ } catch (NoSuchMethodException e) {
+ LOG.info("Security is not supported by this version of hadoop.", e);
+ } catch (InvocationTargetException e) {
+ String msg = "Failed to call isSecurityEnabled()";
+ LOG.info(msg, e);
+ throw new IllegalStateException(msg, e);
+ } catch (IllegalAccessException e) {
+ String msg = "Failed to call isSecurityEnabled()";
+ LOG.info(msg, e);
+ throw new IllegalStateException(msg, e);
+ }
+ return false;
+ }
+
+ // a signature string to associate with a HCatTableInfo - essentially
+ // a concatenation of dbname, tablename and partition keyvalues.
+ String getTokenSignature(OutputJobInfo outputJobInfo) {
+ StringBuilder result = new StringBuilder("");
+ String dbName = outputJobInfo.getDatabaseName();
+ if (dbName != null) {
+ result.append(dbName);
+ }
+ String tableName = outputJobInfo.getTableName();
+ if (tableName != null) {
+ result.append("." + tableName);
+ }
+ Map<String, String> partValues = outputJobInfo.getPartitionValues();
+ if (partValues != null) {
+ for (Entry<String, String> entry : partValues.entrySet()) {
+ result.append("/");
+ result.append(entry.getKey());
+ result.append("=");
+ result.append(entry.getValue());
+ }
+
+ }
+ return result.toString();
+ }
+
+ void handleSecurity(
+ Credentials credentials,
+ OutputJobInfo outputJobInfo,
+ HiveMetaStoreClient client,
+ Configuration conf,
+ boolean harRequested)
+ throws IOException, MetaException, TException, Exception {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ // check if oozie has set up a hcat deleg. token - if so use it
+ TokenSelector<? extends TokenIdentifier> hiveTokenSelector = new DelegationTokenSelector();
+ //Oozie does not change the service field of the token
+ //hence by default token generation will have a value of "new Text("")"
+ //HiveClient will look for a use TokenSelector.selectToken() with service
+ //set to empty "Text" if hive.metastore.token.signature property is set to null
+ Token<? extends TokenIdentifier> hiveToken = hiveTokenSelector.selectToken(
+ new Text(), ugi.getTokens());
+ if (hiveToken == null) {
+ // we did not get token set up by oozie, let's get them ourselves here.
+ // we essentially get a token per unique Output HCatTableInfo - this is
+ // done because through Pig, setOutput() method is called multiple times
+ // We want to only get the token once per unique output HCatTableInfo -
+ // we cannot just get one token since in multi-query case (> 1 store in 1 job)
+ // or the case when a single pig script results in > 1 jobs, the single
+ // token will get cancelled by the output committer and the subsequent
+ // stores will fail - by tying the token with the concatenation of
+ // dbname, tablename and partition keyvalues of the output
+ // TableInfo, we can have as many tokens as there are stores and the TokenSelector
+ // will correctly pick the right tokens which the committer will use and
+ // cancel.
+ String tokenSignature = getTokenSignature(outputJobInfo);
+ // get delegation tokens from hcat server and store them into the "job"
+ // These will be used in to publish partitions to
+ // hcat normally in OutputCommitter.commitJob()
+ // when the JobTracker in Hadoop MapReduce starts supporting renewal of
+ // arbitrary tokens, the renewer should be the principal of the JobTracker
+ hiveToken = HCatUtil.extractThriftToken(client.getDelegationToken(ugi.getUserName()), tokenSignature);
+
+ if (harRequested) {
+ TokenSelector<? extends TokenIdentifier> jtTokenSelector =
+ new org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSelector();
+ Token jtToken = jtTokenSelector.selectToken(org.apache.hadoop.security.SecurityUtil.buildTokenService(
+ HCatHadoopShims.Instance.get().getResourceManagerAddress(conf)), ugi.getTokens());
+ if (jtToken == null) {
+ //we don't need to cancel this token as the TokenRenewer for JT tokens
+ //takes care of cancelling them
+ credentials.addToken(
+ new Text("hcat jt token"),
+ HCatUtil.getJobTrackerDelegationToken(conf, ugi.getUserName())
+ );
+ }
+ }
+
+ credentials.addToken(new Text(ugi.getUserName() + "_" + tokenSignature), hiveToken);
+ // this will be used by the outputcommitter to pass on to the metastore client
+ // which in turn will pass on to the TokenSelector so that it can select
+ // the right token.
+ conf.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
+ }
+ }
+ }
+
+ void handleSecurity(
+ Job job,
+ OutputJobInfo outputJobInfo,
+ HiveMetaStoreClient client,
+ Configuration conf,
+ boolean harRequested)
+ throws IOException, MetaException, TException, Exception {
+ handleSecurity(job.getCredentials(), outputJobInfo, client, conf, harRequested);
+ }
+
+ // we should cancel hcat token if it was acquired by hcat
+ // and not if it was supplied (ie Oozie). In the latter
+ // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+ void cancelToken(HiveMetaStoreClient client, JobContext context) throws IOException, MetaException {
+ String tokenStrForm = client.getTokenStrForm();
+ if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ try {
+ client.cancelDelegationToken(tokenStrForm);
+ } catch (TException e) {
+ String msg = "Failed to cancel delegation token";
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+ }
+
+}