You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2011/04/12 17:30:12 UTC
svn commit: r1091509 [4/8] - in /incubator/hcatalog/trunk: ./ bin/ ivy/ src/
src/docs/ src/docs/src/ src/docs/src/documentation/
src/docs/src/documentation/classes/ src/docs/src/documentation/conf/
src/docs/src/documentation/content/ src/docs/src/docum...
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The InputFormat to use to read data from Howl */
+public class HCatInputFormat extends InputFormat<WritableComparable, HCatRecord> {
+
+ /**
+ * Set the input to use for the Job. This queries the metadata server with
+ * the specified partition predicates, gets the matching partitions, puts
+ * the information in the conf object. The inputInfo object is updated with
+ * information needed in the client context
+ * @param job the job object
+ * @param inputInfo the table input info
+ * @throws IOException the exception in communicating with the metadata server
+ */
+ public static void setInput(Job job,
+ HCatTableInfo inputInfo) throws IOException {
+ try {
+ InitializeInput.setInput(job, inputInfo);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Set the schema for the HowlRecord data returned by HowlInputFormat.
+ * @param job the job object
+ * @param hcatSchema the schema to use as the consolidated schema
+ */
+ public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception {
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema));
+ }
+
+
+ /**
+ * Logically split the set of input files for the job. Returns the
+ * underlying InputFormat's splits
+ * @param jobContext the job context object
+ * @return the splits, an HowlInputSplit wrapper over the storage
+ * driver InputSplits
+ * @throws IOException or InterruptedException
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+
+ //Get the job info from the configuration,
+ //throws exception if not initialized
+ JobInfo jobInfo;
+ try {
+ jobInfo = getJobInfo(jobContext);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ List<PartInfo> partitionInfoList = jobInfo.getPartitions();
+ if(partitionInfoList == null ) {
+ //No partitions match the specified partition filter
+ return splits;
+ }
+
+ //For each matching partition, call getSplits on the underlying InputFormat
+ for(PartInfo partitionInfo : partitionInfoList) {
+ Job localJob = new Job(jobContext.getConfiguration());
+ HCatInputStorageDriver storageDriver;
+ try {
+ storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ //Pass all required information to the storage driver
+ initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema());
+
+ //Get the input format for the storage driver
+ InputFormat inputFormat =
+ storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
+
+ //Call getSplit on the storage drivers InputFormat, create an
+ //HCatSplit for each underlying split
+ List<InputSplit> baseSplits = inputFormat.getSplits(localJob);
+
+ for(InputSplit split : baseSplits) {
+ splits.add(new HCatSplit(
+ partitionInfo,
+ split,
+ jobInfo.getTableSchema()));
+ }
+ }
+
+ return splits;
+ }
+
+ /**
+ * Create the RecordReader for the given InputSplit. Returns the underlying
+ * RecordReader if the required operations are supported and schema matches
+ * with HowlTable schema. Returns an HowlRecordReader if operations need to
+ * be implemented in Howl.
+ * @param split the split
+ * @param taskContext the task attempt context
+ * @return the record reader instance, either an HowlRecordReader(later) or
+ * the underlying storage driver's RecordReader
+ * @throws IOException or InterruptedException
+ */
+ @Override
+ public RecordReader<WritableComparable, HCatRecord> createRecordReader(InputSplit split,
+ TaskAttemptContext taskContext) throws IOException, InterruptedException {
+
+ HCatSplit howlSplit = (HCatSplit) split;
+ PartInfo partitionInfo = howlSplit.getPartitionInfo();
+
+ //If running through a Pig job, the JobInfo will not be available in the
+ //backend process context (since HowlLoader works on a copy of the JobContext and does
+ //not call HowlInputFormat.setInput in the backend process).
+ //So this function should NOT attempt to read the JobInfo.
+
+ HCatInputStorageDriver storageDriver;
+ try {
+ storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ //Pass all required information to the storage driver
+ initStorageDriver(storageDriver, taskContext, partitionInfo, howlSplit.getTableSchema());
+
+ //Get the input format for the storage driver
+ InputFormat inputFormat =
+ storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
+
+ //Create the underlying input formats record record and an Howl wrapper
+ RecordReader recordReader =
+ inputFormat.createRecordReader(howlSplit.getBaseSplit(), taskContext);
+
+ return new HCatRecordReader(storageDriver,recordReader);
+ }
+
+ /**
+ * Gets the HowlTable schema for the table specified in the HowlInputFormat.setInput call
+ * on the specified job context. This information is available only after HowlInputFormat.setInput
+ * has been called for a JobContext.
+ * @param context the context
+ * @return the table schema
+ * @throws Exception if HowlInputFromat.setInput has not been called for the current context
+ */
+ public static HCatSchema getTableSchema(JobContext context) throws Exception {
+ JobInfo jobInfo = getJobInfo(context);
+ return jobInfo.getTableSchema();
+ }
+
+ /**
+ * Gets the JobInfo object by reading the Configuration and deserializing
+ * the string. If JobInfo is not present in the configuration, throws an
+ * exception since that means HowlInputFormat.setInput has not been called.
+ * @param jobContext the job context
+ * @return the JobInfo object
+ * @throws Exception the exception
+ */
+ private static JobInfo getJobInfo(JobContext jobContext) throws Exception {
+ String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+ if( jobString == null ) {
+ throw new Exception("job information not found in JobContext. HowlInputFormat.setInput() not called?");
+ }
+
+ return (JobInfo) HCatUtil.deserialize(jobString);
+ }
+
+
+ /**
+ * Initializes the storage driver instance. Passes on the required
+ * schema information, path info and arguments for the supported
+ * features to the storage driver.
+ * @param storageDriver the storage driver
+ * @param context the job context
+ * @param partitionInfo the partition info
+ * @param tableSchema the table level schema
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ private void initStorageDriver(HCatInputStorageDriver storageDriver,
+ JobContext context, PartInfo partitionInfo,
+ HCatSchema tableSchema) throws IOException {
+
+ storageDriver.setInputPath(context, partitionInfo.getLocation());
+
+ if( partitionInfo.getPartitionSchema() != null ) {
+ storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema());
+ }
+
+ storageDriver.setPartitionValues(context, partitionInfo.getPartitionValues());
+
+ //Set the output schema. Use the schema given by user if set, otherwise use the
+ //table level schema
+ HCatSchema outputSchema = null;
+ String outputSchemaString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+ if( outputSchemaString != null ) {
+ outputSchema = (HCatSchema) HCatUtil.deserialize(outputSchemaString);
+ } else {
+ outputSchema = tableSchema;
+ }
+
+ storageDriver.setOutputSchema(context, outputSchema);
+
+ storageDriver.initialize(context, partitionInfo.getInputStorageDriverProperties());
+ }
+
+ /**
+ * Gets the input driver instance.
+ * @param inputStorageDriverClass the input storage driver classname
+ * @return the input driver instance
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ private HCatInputStorageDriver getInputDriverInstance(
+ String inputStorageDriverClass) throws Exception {
+ try {
+ Class<? extends HCatInputStorageDriver> driverClass =
+ (Class<? extends HCatInputStorageDriver>)
+ Class.forName(inputStorageDriverClass);
+ return driverClass.newInstance();
+ } catch(Exception e) {
+ throw new Exception("error creating storage driver " +
+ inputStorageDriverClass, e);
+ }
+ }
+
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The abstract class to be implemented by underlying storage drivers to enable data access from Howl through
+ * HowlInputFormat.
+ */
+public abstract class HCatInputStorageDriver {
+
+ public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
+ // trivial do nothing
+ }
+
+ /**
+ * Returns the InputFormat to use with this Storage Driver.
+ * @param properties the properties containing parameters required for initialization of InputFormat
+ * @return the InputFormat instance
+ */
+ public abstract InputFormat<? extends WritableComparable, ? extends Writable> getInputFormat(Properties howlProperties);
+
+
+ /**
+ * Converts to HowlRecord format usable by HowlInputFormat to convert to required valuetype.
+ * Implementers of StorageDriver should look to overwriting this function so as to convert their
+ * value type to HowlRecord. Default implementation is provided for StorageDriver implementations
+ * on top of an underlying InputFormat that already uses HowlRecord as a tuple
+ * @param value the underlying value to convert to HowlRecord
+ */
+ public abstract HCatRecord convertToHCatRecord(WritableComparable baseKey, Writable baseValue) throws IOException;
+
+ /**
+ * Set the data location for the input.
+ * @param jobContext the job context object
+ * @param location the data location
+ * @throws IOException Signals that an I/O exception has occurred.
+ *
+ * Default implementation for FileInputFormat based Input Formats. Override
+ * this for other input formats.
+ */
+ public void setInputPath(JobContext jobContext, String location) throws IOException{
+
+ // ideally we should just call FileInputFormat.setInputPaths() here - but
+ // that won't work since FileInputFormat.setInputPaths() needs
+ // a Job object instead of a JobContext which we are handed here
+
+ int length = location.length();
+ int curlyOpen = 0;
+ int pathStart = 0;
+ boolean globPattern = false;
+ List<String> pathStrings = new ArrayList<String>();
+
+ for (int i=0; i<length; i++) {
+ char ch = location.charAt(i);
+ switch(ch) {
+ case '{' : {
+ curlyOpen++;
+ if (!globPattern) {
+ globPattern = true;
+ }
+ break;
+ }
+ case '}' : {
+ curlyOpen--;
+ if (curlyOpen == 0 && globPattern) {
+ globPattern = false;
+ }
+ break;
+ }
+ case ',' : {
+ if (!globPattern) {
+ pathStrings.add(location.substring(pathStart, i));
+ pathStart = i + 1 ;
+ }
+ break;
+ }
+ }
+ }
+ pathStrings.add(location.substring(pathStart, length));
+
+ Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+
+ Configuration conf = jobContext.getConfiguration();
+
+ FileSystem fs = FileSystem.get(conf);
+ Path path = paths[0].makeQualified(fs);
+ StringBuilder str = new StringBuilder(StringUtils.escapeString(path.toString()));
+ for(int i = 1; i < paths.length;i++) {
+ str.append(StringUtils.COMMA_STR);
+ path = paths[i].makeQualified(fs);
+ str.append(StringUtils.escapeString(path.toString()));
+ }
+
+ conf.set("mapred.input.dir", str.toString());
+ }
+
+ /**
+ * Set the schema of the data as originally published in Howl. The storage driver might validate that this matches with
+ * the schema it has (like Zebra) or it will use this to create a HowlRecord matching the output schema.
+ * @param jobContext the job context object
+ * @param howlSchema the schema published in Howl for this data
+ * @param instantiationState
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract void setOriginalSchema(JobContext jobContext, HCatSchema howlSchema) throws IOException;
+
+ /**
+ * Set the consolidated schema for the HowlRecord data returned by the storage driver. All tuples returned by the RecordReader should
+ * have this schema. Nulls should be inserted for columns not present in the data.
+ * @param jobContext the job context object
+ * @param howlSchema the schema to use as the consolidated schema
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract void setOutputSchema(JobContext jobContext, HCatSchema howlSchema) throws IOException;
+
+ /**
+ * Sets the partition key values for the current partition. The storage driver is passed this so that the storage
+ * driver can add the partition key values to the output HowlRecord if the partition key values are not present on disk.
+ * @param jobContext the job context object
+ * @param partitionValues the partition values having a map with partition key name as key and the HowlKeyValue as value
+ * @param instantiationState
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract void setPartitionValues(JobContext jobContext, Map<String,String> partitionValues) throws IOException;
+
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.thrift.TException;
+
+public class HCatOutputCommitter extends OutputCommitter {
+
+ /** The underlying output committer */
+ private final OutputCommitter baseCommitter;
+
+ public HCatOutputCommitter(OutputCommitter baseCommitter) {
+ this.baseCommitter = baseCommitter;
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ baseCommitter.abortTask(context);
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ baseCommitter.commitTask(context);
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+ return baseCommitter.needsTaskCommit(context);
+ }
+
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ if( baseCommitter != null ) {
+ baseCommitter.setupJob(context);
+ }
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ baseCommitter.setupTask(context);
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, State state) throws IOException {
+ if(baseCommitter != null) {
+ baseCommitter.abortJob(jobContext, state);
+ }
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+
+ try {
+ HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(
+ jobInfo.getTableInfo().getServerUri(), jobContext.getConfiguration());
+ // cancel the deleg. tokens that were acquired for this job now that
+ // we are done - we should cancel if the tokens were acquired by
+ // HCatOutputFormat and not if they were supplied by Oozie. In the latter
+ // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+ String tokenStrForm = client.getTokenStrForm();
+ if(tokenStrForm != null && jobContext.getConfiguration().get
+ (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ client.cancelDelegationToken(tokenStrForm);
+ }
+ } catch(Exception e) {
+ if( e instanceof HCatException ) {
+ throw (HCatException) e;
+ } else {
+ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+ }
+ }
+
+ Path src = new Path(jobInfo.getLocation());
+ FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+ fs.delete(src, true);
+ }
+
+ public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+ static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+ private static boolean getOutputDirMarking(Configuration conf) {
+ return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+ false);
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ if(baseCommitter != null) {
+ baseCommitter.commitJob(jobContext);
+ }
+ // create _SUCCESS FILE if so requested.
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+ if(getOutputDirMarking(jobContext.getConfiguration())) {
+ Path outputPath = new Path(jobInfo.getLocation());
+ if (outputPath != null) {
+ FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration());
+ // create a file in the folder to mark it
+ if (fileSys.exists(outputPath)) {
+ Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+ if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
+ fileSys.create(filePath).close();
+ }
+ }
+ }
+ }
+ cleanupJob(jobContext);
+ }
+
+ @Override
+ public void cleanupJob(JobContext context) throws IOException {
+
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+ Configuration conf = context.getConfiguration();
+ Table table = jobInfo.getTable();
+ StorageDescriptor tblSD = table.getSd();
+ Path tblPath = new Path(tblSD.getLocation());
+ FileSystem fs = tblPath.getFileSystem(conf);
+
+ if( table.getPartitionKeys().size() == 0 ) {
+ //non partitioned table
+
+ if( baseCommitter != null ) {
+ baseCommitter.cleanupJob(context);
+ }
+
+ //Move data from temp directory the actual table directory
+ //No metastore operation required.
+ Path src = new Path(jobInfo.getLocation());
+ moveTaskOutputs(fs, src, src, tblPath);
+ fs.delete(src, true);
+ return;
+ }
+
+ HiveMetaStoreClient client = null;
+ List<String> values = null;
+ boolean partitionAdded = false;
+ HCatTableInfo tableInfo = jobInfo.getTableInfo();
+
+ try {
+ client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf);
+
+ StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
+
+ Partition partition = new Partition();
+ partition.setDbName(tableInfo.getDatabaseName());
+ partition.setTableName(tableInfo.getTableName());
+ partition.setSd(new StorageDescriptor(tblSD));
+ partition.getSd().setLocation(jobInfo.getLocation());
+
+ updateTableSchema(client, table, jobInfo.getOutputSchema());
+
+ List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ for(HCatFieldSchema fieldSchema : jobInfo.getOutputSchema().getFields()) {
+ fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema));
+ }
+
+ partition.getSd().setCols(fields);
+
+ Map<String,String> partKVs = tableInfo.getPartitionValues();
+ //Get partition value list
+ partition.setValues(getPartitionValueList(table,partKVs));
+
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass());
+ params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass());
+
+ //Copy table level hcat.* keys to the partition
+ for(Map.Entry<Object, Object> entry : storer.getProperties().entrySet()) {
+ params.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+
+ partition.setParameters(params);
+
+ // Sets permissions and group name on partition dirs.
+ FileStatus tblStat = fs.getFileStatus(tblPath);
+ String grpName = tblStat.getGroup();
+ FsPermission perms = tblStat.getPermission();
+ Path partPath = tblPath;
+ for(FieldSchema partKey : table.getPartitionKeys()){
+ partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+ fs.setPermission(partPath, perms);
+ try{
+ fs.setOwner(partPath, null, grpName);
+ } catch(AccessControlException ace){
+ // log the messages before ignoring. Currently, logging is not built in Hcatalog.
+ }
+ }
+
+ //Publish the new partition
+ client.add_partition(partition);
+ partitionAdded = true; //publish to metastore done
+
+ if( baseCommitter != null ) {
+ baseCommitter.cleanupJob(context);
+ }
+ // cancel the deleg. tokens that were acquired for this job now that
+ // we are done - we should cancel if the tokens were acquired by
+ // HCatOutputFormat and not if they were supplied by Oozie. In the latter
+ // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+ String tokenStrForm = client.getTokenStrForm();
+ if(tokenStrForm != null && context.getConfiguration().get
+ (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ client.cancelDelegationToken(tokenStrForm);
+ }
+ } catch (Exception e) {
+
+ if( partitionAdded ) {
+ try {
+ //baseCommitter.cleanupJob failed, try to clean up the metastore
+ client.dropPartition(tableInfo.getDatabaseName(),
+ tableInfo.getTableName(), values);
+ } catch(Exception te) {
+ //Keep cause as the original exception
+ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+ }
+ }
+
+ if( e instanceof HCatException ) {
+ throw (HCatException) e;
+ } else {
+ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+ }
+ } finally {
+ if( client != null ) {
+ client.close();
+ }
+ }
+ }
+
+ private Path constructPartialPartPath(Path partialPath, String partKey, Map<String,String> partKVs){
+
+ StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey));
+ sb.append("=");
+ sb.append(FileUtils.escapePathName(partKVs.get(partKey)));
+ return new Path(partialPath, sb.toString());
+ }
+
+ /**
+ * Update table schema, adding new columns as added for the partition.
+ * @param client the client
+ * @param table the table
+ * @param partitionSchema the schema of the partition
+ * @throws IOException Signals that an I/O exception has occurred.
+ * @throws InvalidOperationException the invalid operation exception
+ * @throws MetaException the meta exception
+ * @throws TException the t exception
+ */
+ private void updateTableSchema(HiveMetaStoreClient client, Table table,
+ HCatSchema partitionSchema) throws IOException, InvalidOperationException, MetaException, TException {
+
+ List<FieldSchema> newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema);
+
+ if( newColumns.size() != 0 ) {
+ List<FieldSchema> tableColumns = new ArrayList<FieldSchema>(table.getSd().getCols());
+ tableColumns.addAll(newColumns);
+
+ //Update table schema to add the newly added columns
+ table.getSd().setCols(tableColumns);
+ client.alter_table(table.getDbName(), table.getTableName(), table);
+ }
+ }
+
+ /**
+ * Convert the partition value map to a value list in the partition key order.
+ * @param table the table being written to
+ * @param valueMap the partition value map
+ * @return the partition value list
+ * @throws IOException
+ */
+ static List<String> getPartitionValueList(Table table, Map<String, String> valueMap) throws IOException {
+
+ if( valueMap.size() != table.getPartitionKeys().size() ) {
+ throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
+ "Table "
+ + table.getTableName() + " has " +
+ table.getPartitionKeys().size() + " partition keys, got "+
+ valueMap.size());
+ }
+
+ List<String> values = new ArrayList<String>();
+
+ for(FieldSchema schema : table.getPartitionKeys()) {
+ String value = valueMap.get(schema.getName().toLowerCase());
+
+ if( value == null ) {
+ throw new HCatException(ErrorType.ERROR_MISSING_PARTITION_KEY,
+ "Key " + schema.getName() + " of table " + table.getTableName());
+ }
+
+ values.add(value);
+ }
+
+ return values;
+ }
+
+ /**
+ * Move all of the files from the temp directory to the final location
+ * @param fs the output file system
+ * @param file the file to move
+ * @param src the source directory
+ * @param dest the target directory
+ * @throws IOException
+ */
+ private void moveTaskOutputs(FileSystem fs,
+ Path file,
+ Path src,
+ Path dest) throws IOException {
+ if (fs.isFile(file)) {
+ Path finalOutputPath = getFinalPath(file, src, dest);
+
+ if (!fs.rename(file, finalOutputPath)) {
+ if (!fs.delete(finalOutputPath, true)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
+ }
+ if (!fs.rename(file, finalOutputPath)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+ }
+ }
+ } else if(fs.getFileStatus(file).isDir()) {
+ FileStatus[] paths = fs.listStatus(file);
+ Path finalOutputPath = getFinalPath(file, src, dest);
+ fs.mkdirs(finalOutputPath);
+
+ if (paths != null) {
+ for (FileStatus path : paths) {
+ moveTaskOutputs(fs, path.getPath(), src, dest);
+ }
+ }
+ }
+ }
+
+ /**
+ * Find the final name of a given output file, given the output directory
+ * and the work directory.
+ * @param file the file to move
+ * @param src the source directory
+ * @param dest the target directory
+ * @return the final path for the specific output file
+ * @throws IOException
+ */
+ private Path getFinalPath(Path file, Path src,
+ Path dest) throws IOException {
+ URI taskOutputUri = file.toUri();
+ URI relativePath = src.toUri().relativize(taskOutputUri);
+ if (taskOutputUri == relativePath) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Can not get the relative path: base = " +
+ src + " child = " + file);
+ }
+ if (relativePath.getPath().length() > 0) {
+ return new Path(dest, relativePath.getPath());
+ } else {
+ return dest;
+ }
+ }
+
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.thrift.TException;
+
+/** The OutputFormat to use to write data to Howl. The key value is ignored and
+ * and should be given as null. The value is the HowlRecord to write.*/
+public class HCatOutputFormat extends OutputFormat<WritableComparable<?>, HCatRecord> {
+
+ /** The directory under which data is initially written for a non partitioned table */
+ protected static final String TEMP_DIR_NAME = "_TEMP";
+ private static Map<String, Token<DelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<DelegationTokenIdentifier>>();
+
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Set the info about the output to write for the Job. This queries the metadata server
+ * to find the StorageDriver to use for the table. Throws error if partition is already published.
+ * @param job the job object
+ * @param outputInfo the table output info
+ * @throws IOException the exception in communicating with the metadata server
+ */
+ @SuppressWarnings("unchecked")
+ public static void setOutput(Job job, HCatTableInfo outputInfo) throws IOException {
+ HiveMetaStoreClient client = null;
+
+ try {
+
+ Configuration conf = job.getConfiguration();
+ client = createHiveClient(outputInfo.getServerUri(), conf);
+ Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName());
+
+ if( outputInfo.getPartitionValues() == null ) {
+ outputInfo.setPartitionValues(new HashMap<String, String>());
+ } else {
+ //Convert user specified map to have lower case key names
+ Map<String, String> valueMap = new HashMap<String, String>();
+ for(Map.Entry<String, String> entry : outputInfo.getPartitionValues().entrySet()) {
+ valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
+ }
+
+ outputInfo.setPartitionValues(valueMap);
+ }
+
+ //Handle duplicate publish
+ handleDuplicatePublish(job, outputInfo, client, table);
+
+ StorageDescriptor tblSD = table.getSd();
+ HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD);
+ StorerInfo storerInfo = InitializeInput.extractStorerInfo(tblSD,table.getParameters());
+
+ List<String> partitionCols = new ArrayList<String>();
+ for(FieldSchema schema : table.getPartitionKeys()) {
+ partitionCols.add(schema.getName());
+ }
+
+ Class<? extends HCatOutputStorageDriver> driverClass =
+ (Class<? extends HCatOutputStorageDriver>) Class.forName(storerInfo.getOutputSDClass());
+ HCatOutputStorageDriver driver = driverClass.newInstance();
+
+ String tblLocation = tblSD.getLocation();
+ String location = driver.getOutputLocation(job,
+ tblLocation, partitionCols,
+ outputInfo.getPartitionValues());
+
+ //Serialize the output info into the configuration
+ OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
+ tableSchema, tableSchema, storerInfo, location, table);
+ conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
+
+ Path tblPath = new Path(tblLocation);
+
+ /* Set the umask in conf such that files/dirs get created with table-dir
+ * permissions. Following three assumptions are made:
+ * 1. Actual files/dirs creation is done by RecordWriter of underlying
+ * output format. It is assumed that they use default permissions while creation.
+ * 2. Default Permissions = FsPermission.getDefault() = 777.
+ * 3. UMask is honored by underlying filesystem.
+ */
+
+ FsPermission.setUMask(conf, FsPermission.getDefault().applyUMask(
+ tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission()));
+
+ if(UserGroupInformation.isSecurityEnabled()){
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ // check if oozie has set up a howl deleg. token - if so use it
+ TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
+ // TODO: will oozie use a "service" called "oozie" - then instead of
+ // new Text() do new Text("oozie") below - if this change is made also
+ // remember to do:
+ // job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie");
+ // Also change code in HowlOutputCommitter.cleanupJob() to cancel the
+ // token only if token.service is not "oozie" - remove the condition of
+ // HCAT_KEY_TOKEN_SIGNATURE != null in that code.
+ Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+ new Text(), ugi.getTokens());
+ if(token != null) {
+
+ job.getCredentials().addToken(new Text(ugi.getUserName()),token);
+
+ } else {
+
+ // we did not get token set up by oozie, let's get them ourselves here.
+ // we essentially get a token per unique Output HowlTableInfo - this is
+ // done because through Pig, setOutput() method is called multiple times
+ // We want to only get the token once per unique output HowlTableInfo -
+ // we cannot just get one token since in multi-query case (> 1 store in 1 job)
+ // or the case when a single pig script results in > 1 jobs, the single
+ // token will get cancelled by the output committer and the subsequent
+ // stores will fail - by tying the token with the concatenation of
+ // dbname, tablename and partition keyvalues of the output
+ // TableInfo, we can have as many tokens as there are stores and the TokenSelector
+ // will correctly pick the right tokens which the committer will use and
+ // cancel.
+ String tokenSignature = getTokenSignature(outputInfo);
+ if(tokenMap.get(tokenSignature) == null) {
+ // get delegation tokens from howl server and store them into the "job"
+ // These will be used in the HowlOutputCommitter to publish partitions to
+ // howl
+ String tokenStrForm = client.getDelegationTokenWithSignature(ugi.getUserName(),
+ tokenSignature);
+ Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
+ t.decodeFromUrlString(tokenStrForm);
+ tokenMap.put(tokenSignature, t);
+ }
+ job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature),
+ tokenMap.get(tokenSignature));
+ // this will be used by the outputcommitter to pass on to the metastore client
+ // which in turn will pass on to the TokenSelector so that it can select
+ // the right token.
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
+ }
+ }
+ } catch(Exception e) {
+ if( e instanceof HCatException ) {
+ throw (HCatException) e;
+ } else {
+ throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
+ }
+ } finally {
+ if( client != null ) {
+ client.close();
+ }
+ }
+ }
+
+
+ // a signature string to associate with a HowlTableInfo - essentially
+ // a concatenation of dbname, tablename and partition keyvalues.
+ private static String getTokenSignature(HCatTableInfo outputInfo) {
+ StringBuilder result = new StringBuilder("");
+ String dbName = outputInfo.getDatabaseName();
+ if(dbName != null) {
+ result.append(dbName);
+ }
+ String tableName = outputInfo.getTableName();
+ if(tableName != null) {
+ result.append("+" + tableName);
+ }
+ Map<String, String> partValues = outputInfo.getPartitionValues();
+ if(partValues != null) {
+ for(Entry<String, String> entry: partValues.entrySet()) {
+ result.append("+" + entry.getKey() + "=" + entry.getValue());
+ }
+ }
+ return result.toString();
+ }
+
+
+
+ /**
+ * Handles duplicate publish of partition. Fails if partition already exists.
+ * For non partitioned tables, fails if files are present in table directory.
+ * @param job the job
+ * @param outputInfo the output info
+ * @param client the metastore client
+ * @param table the table being written to
+ * @throws IOException
+ * @throws MetaException
+ * @throws TException
+ */
+ private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo,
+ HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException {
+ List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
+ table, outputInfo.getPartitionValues());
+
+ if( table.getPartitionKeys().size() > 0 ) {
+ //For partitioned table, fail if partition is already present
+ List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
+ outputInfo.getTableName(), partitionValues, (short) 1);
+
+ if( currentParts.size() > 0 ) {
+ throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
+ }
+ } else {
+ Path tablePath = new Path(table.getSd().getLocation());
+ FileSystem fs = tablePath.getFileSystem(job.getConfiguration());
+
+ if ( fs.exists(tablePath) ) {
+ FileStatus[] status = fs.globStatus(new Path(tablePath, "*"), hiddenFileFilter);
+
+ if( status.length > 0 ) {
+ throw new HCatException(ErrorType.ERROR_NON_EMPTY_TABLE,
+ table.getDbName() + "." + table.getTableName());
+ }
+ }
+ }
+ }
+
+ /**
+ * Set the schema for the data being written out to the partition. The
+ * table schema is used by default for the partition if this is not called.
+ * @param job the job object
+ * @param schema the schema for the data
+ */
+ public static void setSchema(final Job job, final HCatSchema schema) throws IOException {
+
+ OutputJobInfo jobInfo = getJobInfo(job);
+ Map<String,String> partMap = jobInfo.getTableInfo().getPartitionValues();
+ List<Integer> posOfPartCols = new ArrayList<Integer>();
+
+ // If partition columns occur in data, we want to remove them.
+ // So, find out positions of partition columns in schema provided by user.
+ // We also need to update the output Schema with these deletions.
+
+ // Note that, output storage drivers never sees partition columns in data
+ // or schema.
+
+ HCatSchema schemaWithoutParts = new HCatSchema(schema.getFields());
+ for(String partKey : partMap.keySet()){
+ Integer idx;
+ if((idx = schema.getPosition(partKey)) != null){
+ posOfPartCols.add(idx);
+ schemaWithoutParts.remove(schema.get(partKey));
+ }
+ }
+ HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts);
+ jobInfo.setPosOfPartCols(posOfPartCols);
+ jobInfo.setOutputSchema(schemaWithoutParts);
+ job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
+ }
+
+ /**
+ * Gets the table schema for the table specified in the HowlOutputFormat.setOutput call
+ * on the specified job context.
+ * @param context the context
+ * @return the table schema
+ * @throws IOException if HowlOutputFromat.setOutput has not been called for the passed context
+ */
+ public static HCatSchema getTableSchema(JobContext context) throws IOException {
+ OutputJobInfo jobInfo = getJobInfo(context);
+ return jobInfo.getTableSchema();
+ }
+
+ /**
+ * Get the record writer for the job. Uses the Table's default OutputStorageDriver
+ * to get the record writer.
+ * @param context the information about the current task.
+ * @return a RecordWriter to write the output for the job.
+ * @throws IOException
+ */
+ @Override
+ public RecordWriter<WritableComparable<?>, HCatRecord>
+ getRecordWriter(TaskAttemptContext context
+ ) throws IOException, InterruptedException {
+
+ // First create the RW.
+ HCatRecordWriter rw = new HCatRecordWriter(context);
+
+ // Now set permissions and group on freshly created files.
+ OutputJobInfo info = getJobInfo(context);
+ Path workFile = rw.getStorageDriver().getWorkFilePath(context,info.getLocation());
+ Path tblPath = new Path(info.getTable().getSd().getLocation());
+ FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
+ FileStatus tblPathStat = fs.getFileStatus(tblPath);
+ fs.setPermission(workFile, tblPathStat.getPermission());
+ try{
+ fs.setOwner(workFile, null, tblPathStat.getGroup());
+ } catch(AccessControlException ace){
+ // log the messages before ignoring. Currently, logging is not built in Howl.
+ }
+ return rw;
+ }
+
+ /**
+ * Check for validity of the output-specification for the job.
+ * @param context information about the job
+ * @throws IOException when output should not be attempted
+ */
+ @Override
+ public void checkOutputSpecs(JobContext context
+ ) throws IOException, InterruptedException {
+ OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
+ outputFormat.checkOutputSpecs(context);
+ }
+
+ /**
+ * Get the output committer for this output format. This is responsible
+ * for ensuring the output is committed correctly.
+ * @param context the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context
+ ) throws IOException, InterruptedException {
+ OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
+ return new HCatOutputCommitter(outputFormat.getOutputCommitter(context));
+ }
+
+
+ /**
+ * Gets the output format instance.
+ * @param context the job context
+ * @return the output format instance
+ * @throws IOException
+ */
+ private OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat(JobContext context) throws IOException {
+ OutputJobInfo jobInfo = getJobInfo(context);
+ HCatOutputStorageDriver driver = getOutputDriverInstance(context, jobInfo);
+
+ OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat =
+ driver.getOutputFormat();
+ return outputFormat;
+ }
+
+ /**
+ * Gets the HowlOuputJobInfo object by reading the Configuration and deserializing
+ * the string. If JobInfo is not present in the configuration, throws an
+ * exception since that means HowlOutputFormat.setOutput has not been called.
+ * @param jobContext the job context
+ * @return the OutputJobInfo object
+ * @throws IOException the IO exception
+ */
+ static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
+ String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ if( jobString == null ) {
+ throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED);
+ }
+
+ return (OutputJobInfo) HCatUtil.deserialize(jobString);
+ }
+
+ /**
+ * Gets the output storage driver instance.
+ * @param jobContext the job context
+ * @param jobInfo the output job info
+ * @return the output driver instance
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ static HCatOutputStorageDriver getOutputDriverInstance(
+ JobContext jobContext, OutputJobInfo jobInfo) throws IOException {
+ try {
+ Class<? extends HCatOutputStorageDriver> driverClass =
+ (Class<? extends HCatOutputStorageDriver>)
+ Class.forName(jobInfo.getStorerInfo().getOutputSDClass());
+ HCatOutputStorageDriver driver = driverClass.newInstance();
+
+ //Initialize the storage driver
+ driver.setSchema(jobContext, jobInfo.getOutputSchema());
+ driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues());
+ driver.setOutputPath(jobContext, jobInfo.getLocation());
+
+ driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties());
+
+ return driver;
+ } catch(Exception e) {
+ throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e);
+ }
+ }
+
+ static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException {
+ HiveConf hiveConf = new HiveConf(HCatOutputFormat.class);
+
+ if( url != null ) {
+ //User specified a thrift url
+
+ hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
+ hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, conf.get(HCatConstants.HCAT_METASTORE_PRINCIPAL));
+
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, url);
+ if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+ hiveConf.set("hive.metastore.token.signature", conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
+ }
+ } else {
+ //Thrift url is null, copy the hive conf into the job conf and restore it
+ //in the backend context
+
+ if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) {
+ conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(hiveConf.getAllProperties()));
+ } else {
+ //Copy configuration properties into the hive conf
+ Properties properties = (Properties) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+
+ for(Map.Entry<Object, Object> prop : properties.entrySet() ) {
+ if( prop.getValue() instanceof String ) {
+ hiveConf.set((String) prop.getKey(), (String) prop.getValue());
+ } else if( prop.getValue() instanceof Integer ) {
+ hiveConf.setInt((String) prop.getKey(), (Integer) prop.getValue());
+ } else if( prop.getValue() instanceof Boolean ) {
+ hiveConf.setBoolean((String) prop.getKey(), (Boolean) prop.getValue());
+ } else if( prop.getValue() instanceof Long ) {
+ hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue());
+ } else if( prop.getValue() instanceof Float ) {
+ hiveConf.setFloat((String) prop.getKey(), (Float) prop.getValue());
+ }
+ }
+ }
+
+ }
+
+ return new HiveMetaStoreClient(hiveConf);
+ }
+
+
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+
+/** The abstract class to be implemented by underlying storage drivers to enable data access from Howl through
+ * HowlOutputFormat.
+ */
+public abstract class HCatOutputStorageDriver {
+
+ /**
+ * Initialize the storage driver with specified properties, default implementation does nothing.
+ * @param context the job context object
+ * @param howlProperties the properties for the storage driver
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public void initialize(JobContext context, Properties howlProperties) throws IOException {
+ }
+
+ /**
+ * Returns the OutputFormat to use with this Storage Driver.
+ * @return the OutputFormat instance
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException;
+
+ /**
+ * Set the data location for the output.
+ * @param jobContext the job context object
+ * @param location the data location
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract void setOutputPath(JobContext jobContext, String location) throws IOException;
+
+ /**
+ * Set the schema for the data being written out.
+ * @param jobContext the job context object
+ * @param schema the data schema
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract void setSchema(JobContext jobContext, HCatSchema schema) throws IOException;
+
+ /**
+ * Sets the partition key values for the partition being written.
+ * @param jobContext the job context object
+ * @param partitionValues the partition values
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues) throws IOException;
+
+ /**
+ * Generate the key for the underlying outputformat. The value given to HowlOutputFormat is passed as the
+ * argument. The key given to HowlOutputFormat is ignored..
+ * @param value the value given to HowlOutputFormat
+ * @return a key instance
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract WritableComparable<?> generateKey(HCatRecord value) throws IOException;
+
+ /**
+ * Convert the given HowlRecord value to the actual value type.
+ * @param value the HowlRecord value to convert
+ * @return a value instance
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public abstract Writable convertValue(HCatRecord value) throws IOException;
+
+ /**
+ * Gets the location to use for the specified partition values.
+ * The storage driver can override as required.
+ * @param jobContext the job context object
+ * @param tableLocation the location of the table
+ * @param partitionValues the partition values
+ * @return the location String.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public String getOutputLocation(JobContext jobContext,
+ String tableLocation, List<String> partitionCols, Map<String, String> partitionValues) throws IOException {
+
+ if( partitionValues == null || partitionValues.size() == 0 ) {
+ return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString();
+ }
+
+ List<String> values = new ArrayList<String>();
+ for(String partitionCol : partitionCols) {
+ values.add(partitionValues.get(partitionCol));
+ }
+
+ String partitionLocation = FileUtils.makePartName(partitionCols, values);
+
+ Path path = new Path(tableLocation, partitionLocation);
+ return path.toString();
+ }
+
+ /** Default implementation assumes FileOutputFormat. Storage drivers wrapping
+ * other OutputFormats should override this method.
+ */
+ public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{
+ return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), FileOutputFormat.getUniqueFile(context, "part",""));
+ }
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+/** The Howl wrapper for the underlying RecordReader, this ensures that the initialize on
+ * the underlying record reader is done with the underlying split, not with HowlSplit.
+ */
+class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> {
+
+ /** The underlying record reader to delegate to. */
+ private final RecordReader<? extends WritableComparable, ? extends Writable> baseRecordReader;
+
+ /** The storage driver used */
+ private final HCatInputStorageDriver storageDriver;
+
+ /**
+ * Instantiates a new howl record reader.
+ * @param baseRecordReader the base record reader
+ */
+ public HCatRecordReader(HCatInputStorageDriver storageDriver, RecordReader<? extends WritableComparable, ? extends Writable> baseRecordReader) {
+ this.baseRecordReader = baseRecordReader;
+ this.storageDriver = storageDriver;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext taskContext)
+ throws IOException, InterruptedException {
+ InputSplit baseSplit = split;
+
+ if( split instanceof HCatSplit ) {
+ baseSplit = ((HCatSplit) split).getBaseSplit();
+ }
+
+ baseRecordReader.initialize(baseSplit, taskContext);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
+ */
+ @Override
+ public WritableComparable getCurrentKey() throws IOException, InterruptedException {
+ return baseRecordReader.getCurrentKey();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
+ */
+ @Override
+ public HCatRecord getCurrentValue() throws IOException, InterruptedException {
+ return storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue());
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
+ */
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return baseRecordReader.getProgress();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
+ */
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return baseRecordReader.nextKeyValue();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordReader#close()
+ */
+ @Override
+ public void close() throws IOException {
+ baseRecordReader.close();
+ }
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+
+public class HCatRecordWriter extends RecordWriter<WritableComparable<?>, HCatRecord> {
+
+ private final HCatOutputStorageDriver storageDriver;
+ /**
+ * @return the storageDriver
+ */
+ public HCatOutputStorageDriver getStorageDriver() {
+ return storageDriver;
+ }
+
+ private final RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter;
+ private final List<Integer> partColsToDel;
+
+ public HCatRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+
+ // If partition columns occur in data, we want to remove them.
+ partColsToDel = jobInfo.getPosOfPartCols();
+
+ if(partColsToDel == null){
+ throw new HCatException("It seems that setSchema() is not called on " +
+ "HowlOutputFormat. Please make sure that method is called.");
+ }
+
+ this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo);
+ this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ baseWriter.close(context);
+ }
+
+ @Override
+ public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
+ InterruptedException {
+
+ for(Integer colToDel : partColsToDel){
+ value.remove(colToDel);
+ }
+ //The key given by user is ignored
+ WritableComparable<?> generatedKey = storageDriver.generateKey(value);
+ Writable convertedValue = storageDriver.convertValue(value);
+ baseWriter.write(generatedKey, convertedValue);
+ }
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The HowlSplit wrapper around the InputSplit returned by the underlying InputFormat */
+class HCatSplit extends InputSplit implements Writable {
+
+ /** The partition info for the split. */
+ private PartInfo partitionInfo;
+
+ /** The split returned by the underlying InputFormat split. */
+ private InputSplit baseSplit;
+
+ /** The schema for the HowlTable */
+ private HCatSchema tableSchema;
+ /**
+ * Instantiates a new howl split.
+ */
+ public HCatSplit() {
+ }
+
+ /**
+ * Instantiates a new howl split.
+ *
+ * @param partitionInfo the partition info
+ * @param baseSplit the base split
+ * @param tableSchema the table level schema
+ */
+ public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) {
+ this.partitionInfo = partitionInfo;
+ this.baseSplit = baseSplit;
+ this.tableSchema = tableSchema;
+ }
+
+ /**
+ * Gets the partition info.
+ * @return the partitionInfo
+ */
+ public PartInfo getPartitionInfo() {
+ return partitionInfo;
+ }
+
+ /**
+ * Gets the underlying InputSplit.
+ * @return the baseSplit
+ */
+ public InputSplit getBaseSplit() {
+ return baseSplit;
+ }
+
+ /**
+ * Sets the table schema.
+ * @param tableSchema the new table schema
+ */
+ public void setTableSchema(HCatSchema tableSchema) {
+ this.tableSchema = tableSchema;
+ }
+
+ /**
+ * Gets the table schema.
+ * @return the table schema
+ */
+ public HCatSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
+ */
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return baseSplit.getLength();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
+ */
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return baseSplit.getLocations();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ String partitionInfoString = WritableUtils.readString(input);
+ partitionInfo = (PartInfo) HCatUtil.deserialize(partitionInfoString);
+
+ String baseSplitClassName = WritableUtils.readString(input);
+ InputSplit split;
+ try{
+ Class<? extends InputSplit> splitClass =
+ (Class<? extends InputSplit>) Class.forName(baseSplitClassName);
+
+ //Class.forName().newInstance() does not work if the underlying
+ //InputSplit has package visibility
+ Constructor<? extends InputSplit> constructor =
+ splitClass.getDeclaredConstructor(new Class[]{});
+ constructor.setAccessible(true);
+
+ split = constructor.newInstance();
+ // read baseSplit from input
+ ((Writable)split).readFields(input);
+ this.baseSplit = split;
+ }catch(Exception e){
+ throw new IOException ("Exception from " +baseSplitClassName + " : " + e.getMessage());
+ }
+
+ String tableSchemaString = WritableUtils.readString(input);
+ tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ @Override
+ public void write(DataOutput output) throws IOException {
+ String partitionInfoString = HCatUtil.serialize(partitionInfo);
+
+ // write partitionInfo into output
+ WritableUtils.writeString(output, partitionInfoString);
+
+ WritableUtils.writeString(output, baseSplit.getClass().getName());
+ Writable baseSplitWritable = (Writable)baseSplit;
+ //write baseSplit into output
+ baseSplitWritable.write(output);
+
+ //write the table schema into output
+ String tableSchemaString = HCatUtil.serialize(tableSchema);
+ WritableUtils.writeString(output, tableSchemaString);
+ }
+}
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
+/**
+ *
+ * HCatTableInfo - class to communicate table information to {@link HowlInputFormat}
+ * and {@link HowlOutputFormat}
+ *
+ */
+public class HCatTableInfo implements Serializable {
+
+
+ private static final long serialVersionUID = 1L;
+
+ public enum TableInfoType {
+ INPUT_INFO,
+ OUTPUT_INFO
+ };
+
+ private final TableInfoType tableInfoType;
+
+ /** The Metadata server uri */
+ private final String serverUri;
+
+ /** If the howl server is configured to work with hadoop security, this
+ * variable will hold the principal name of the server - this will be used
+ * in the authentication to the howl server using kerberos
+ */
+ private final String serverKerberosPrincipal;
+
+ /** The db and table names */
+ private final String dbName;
+ private final String tableName;
+
+ /** The partition filter */
+ private String filter;
+
+ /** The partition predicates to filter on, an arbitrary AND/OR filter, if used to input from*/
+ private final String partitionPredicates;
+
+ /** The information about the partitions matching the specified query */
+ private JobInfo jobInfo;
+
+ /** The partition values to publish to, if used for output*/
+ private Map<String, String> partitionValues;
+
+ /**
+ * Initializes a new HCatTableInfo instance to be used with {@link HowlInputFormat}
+ * for reading data from a table.
+ * @param serverUri the Metadata server uri
+ * @param serverKerberosPrincipal If the howl server is configured to
+ * work with hadoop security, the kerberos principal name of the server - else null
+ * The principal name should be of the form:
+ * <servicename>/_HOST@<realm> like "howl/_HOST@myrealm.com"
+ * The special string _HOST will be replaced automatically with the correct host name
+ * @param dbName the db name
+ * @param tableName the table name
+ */
+ public static HCatTableInfo getInputTableInfo(String serverUri,
+ String serverKerberosPrincipal,
+ String dbName,
+ String tableName) {
+ return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, (String) null);
+ }
+
+ /**
+ * Initializes a new HCatTableInfo instance to be used with {@link HowlInputFormat}
+ * for reading data from a table.
+ * @param serverUri the Metadata server uri
+ * @param serverKerberosPrincipal If the howl server is configured to
+ * work with hadoop security, the kerberos principal name of the server - else null
+ * The principal name should be of the form:
+ * <servicename>/_HOST@<realm> like "howl/_HOST@myrealm.com"
+ * The special string _HOST will be replaced automatically with the correct host name
+ * @param dbName the db name
+ * @param tableName the table name
+ * @param filter the partition filter
+ */
+ public static HCatTableInfo getInputTableInfo(String serverUri, String serverKerberosPrincipal, String dbName,
+ String tableName, String filter) {
+ return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, filter);
+ }
+
+ private HCatTableInfo(String serverUri, String serverKerberosPrincipal,
+ String dbName, String tableName, String filter) {
+ this.serverUri = serverUri;
+ this.serverKerberosPrincipal = serverKerberosPrincipal;
+ this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+ this.tableName = tableName;
+ this.partitionPredicates = null;
+ this.partitionValues = null;
+ this.tableInfoType = TableInfoType.INPUT_INFO;
+ this.filter = filter;
+ }
+ /**
+ * Initializes a new HCatTableInfo instance to be used with {@link HowlOutputFormat}
+ * for writing data from a table.
+ * @param serverUri the Metadata server uri
+ * @param serverKerberosPrincipal If the howl server is configured to
+ * work with hadoop security, the kerberos principal name of the server - else null
+ * The principal name should be of the form:
+ * <servicename>/_HOST@<realm> like "howl/_HOST@myrealm.com"
+ * The special string _HOST will be replaced automatically with the correct host name
+ * @param dbName the db name
+ * @param tableName the table name
+ * @param partitionValues The partition values to publish to, can be null or empty Map to
+ * indicate write to a unpartitioned table. For partitioned tables, this map should
+ * contain keys for all partition columns with corresponding values.
+ */
+ public static HCatTableInfo getOutputTableInfo(String serverUri,
+ String serverKerberosPrincipal, String dbName, String tableName, Map<String, String> partitionValues){
+ return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName,
+ tableName, partitionValues);
+ }
+
+ private HCatTableInfo(String serverUri, String serverKerberosPrincipal,
+ String dbName, String tableName, Map<String, String> partitionValues){
+ this.serverUri = serverUri;
+ this.serverKerberosPrincipal = serverKerberosPrincipal;
+ this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+ this.tableName = tableName;
+ this.partitionPredicates = null;
+ this.partitionValues = partitionValues;
+ this.tableInfoType = TableInfoType.OUTPUT_INFO;
+ }
+
+ /**
+ * Gets the value of serverUri
+ * @return the serverUri
+ */
+ public String getServerUri() {
+ return serverUri;
+ }
+
+ /**
+ * Gets the value of dbName
+ * @return the dbName
+ */
+ public String getDatabaseName() {
+ return dbName;
+ }
+
+ /**
+ * Gets the value of tableName
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * Gets the value of partitionPredicates
+ * @return the partitionPredicates
+ */
+ public String getPartitionPredicates() {
+ return partitionPredicates;
+ }
+
+ /**
+ * Gets the value of partitionValues
+ * @return the partitionValues
+ */
+ public Map<String, String> getPartitionValues() {
+ return partitionValues;
+ }
+
+ /**
+ * Gets the value of job info
+ * @return the job info
+ */
+ public JobInfo getJobInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * Sets the value of jobInfo
+ * @param jobInfo the jobInfo to set
+ */
+ public void setJobInfo(JobInfo jobInfo) {
+ this.jobInfo = jobInfo;
+ }
+
+ public TableInfoType getTableType(){
+ return this.tableInfoType;
+ }
+
+ /**
+ * Sets the value of partitionValues
+ * @param partitionValues the partition values to set
+ */
+ void setPartitionValues(Map<String, String> partitionValues) {
+ this.partitionValues = partitionValues;
+ }
+
+ /**
+ * Gets the value of partition filter
+ * @return the filter string
+ */
+ public String getFilter() {
+ return filter;
+ }
+
+ /**
+ * @return the serverKerberosPrincipal
+ */
+ public String getServerKerberosPrincipal() {
+ return serverKerberosPrincipal;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31*result + (serverUri == null ? 0 : serverUri.hashCode());
+ result = 31*result + (serverKerberosPrincipal == null ? 0 : serverKerberosPrincipal.hashCode());
+ result = 31*result + (dbName == null? 0 : dbName.hashCode());
+ result = 31*result + tableName.hashCode();
+ result = 31*result + (filter == null? 0 : filter.hashCode());
+ result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode());
+ result = 31*result + tableInfoType.ordinal();
+ result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode());
+ return result;
+
+ }
+}
+