You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ga...@apache.org on 2011/04/12 17:30:12 UTC

svn commit: r1091509 [4/8] - in /incubator/hcatalog/trunk: ./ bin/ ivy/ src/ src/docs/ src/docs/src/ src/docs/src/documentation/ src/docs/src/documentation/classes/ src/docs/src/documentation/conf/ src/docs/src/documentation/content/ src/docs/src/docum...

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The InputFormat to use to read data from Howl */
+public class HCatInputFormat extends InputFormat<WritableComparable, HCatRecord> {
+
+  /**
+   * Set the input to use for the Job. This queries the metadata server with
+   * the specified partition predicates, gets the matching partitions, puts
+   * the information in the conf object. The inputInfo object is updated with
+   * information needed in the client context
+   * @param job the job object
+   * @param inputInfo the table input info
+   * @throws IOException the exception in communicating with the metadata server
+   */
+  public static void setInput(Job job,
+      HCatTableInfo inputInfo) throws IOException {
+    try {
+      InitializeInput.setInput(job, inputInfo);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Set the schema for the HowlRecord data returned by HowlInputFormat.
+   * @param job the job object
+   * @param hcatSchema the schema to use as the consolidated schema
+   */
+  public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception {
+    job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema));
+  }
+
+
+  /**
+   * Logically split the set of input files for the job. Returns the
+   * underlying InputFormat's splits
+   * @param jobContext the job context object
+   * @return the splits, an HowlInputSplit wrapper over the storage
+   *         driver InputSplits
+   * @throws IOException or InterruptedException
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext)
+  throws IOException, InterruptedException {
+
+    //Get the job info from the configuration,
+    //throws exception if not initialized
+    JobInfo jobInfo;
+    try {
+      jobInfo = getJobInfo(jobContext);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<PartInfo> partitionInfoList = jobInfo.getPartitions();
+    if(partitionInfoList == null ) {
+      //No partitions match the specified partition filter
+      return splits;
+    }
+
+    //For each matching partition, call getSplits on the underlying InputFormat
+    for(PartInfo partitionInfo : partitionInfoList) {
+      Job localJob = new Job(jobContext.getConfiguration());
+      HCatInputStorageDriver storageDriver;
+      try {
+        storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+
+      //Pass all required information to the storage driver
+      initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema());
+
+      //Get the input format for the storage driver
+      InputFormat inputFormat =
+        storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
+
+      //Call getSplit on the storage drivers InputFormat, create an
+      //HCatSplit for each underlying split
+      List<InputSplit> baseSplits = inputFormat.getSplits(localJob);
+
+      for(InputSplit split : baseSplits) {
+        splits.add(new HCatSplit(
+            partitionInfo,
+            split,
+            jobInfo.getTableSchema()));
+      }
+    }
+
+    return splits;
+  }
+
+  /**
+   * Create the RecordReader for the given InputSplit. Returns the underlying
+   * RecordReader if the required operations are supported and schema matches
+   * with HowlTable schema. Returns an HowlRecordReader if operations need to
+   * be implemented in Howl.
+   * @param split the split
+   * @param taskContext the task attempt context
+   * @return the record reader instance, either an HowlRecordReader(later) or
+   *         the underlying storage driver's RecordReader
+   * @throws IOException or InterruptedException
+   */
+  @Override
+  public RecordReader<WritableComparable, HCatRecord> createRecordReader(InputSplit split,
+      TaskAttemptContext taskContext) throws IOException, InterruptedException {
+
+    HCatSplit howlSplit = (HCatSplit) split;
+    PartInfo partitionInfo = howlSplit.getPartitionInfo();
+
+    //If running through a Pig job, the JobInfo will not be available in the
+    //backend process context (since HowlLoader works on a copy of the JobContext and does
+    //not call HowlInputFormat.setInput in the backend process).
+    //So this function should NOT attempt to read the JobInfo.
+
+    HCatInputStorageDriver storageDriver;
+    try {
+      storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    //Pass all required information to the storage driver
+    initStorageDriver(storageDriver, taskContext, partitionInfo, howlSplit.getTableSchema());
+
+    //Get the input format for the storage driver
+    InputFormat inputFormat =
+      storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
+
+    //Create the underlying input formats record record and an Howl wrapper
+    RecordReader recordReader =
+      inputFormat.createRecordReader(howlSplit.getBaseSplit(), taskContext);
+
+    return new HCatRecordReader(storageDriver,recordReader);
+  }
+
+  /**
+   * Gets the HowlTable schema for the table specified in the HowlInputFormat.setInput call
+   * on the specified job context. This information is available only after HowlInputFormat.setInput
+   * has been called for a JobContext.
+   * @param context the context
+   * @return the table schema
+   * @throws Exception if HowlInputFromat.setInput has not been called for the current context
+   */
+  public static HCatSchema getTableSchema(JobContext context) throws Exception {
+    JobInfo jobInfo = getJobInfo(context);
+    return jobInfo.getTableSchema();
+  }
+
+  /**
+   * Gets the JobInfo object by reading the Configuration and deserializing
+   * the string. If JobInfo is not present in the configuration, throws an
+   * exception since that means HowlInputFormat.setInput has not been called.
+   * @param jobContext the job context
+   * @return the JobInfo object
+   * @throws Exception the exception
+   */
+  private static JobInfo getJobInfo(JobContext jobContext) throws Exception {
+    String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+    if( jobString == null ) {
+      throw new Exception("job information not found in JobContext. HowlInputFormat.setInput() not called?");
+    }
+
+    return (JobInfo) HCatUtil.deserialize(jobString);
+  }
+
+
+  /**
+   * Initializes the storage driver instance. Passes on the required
+   * schema information, path info and arguments for the supported
+   * features to the storage driver.
+   * @param storageDriver the storage driver
+   * @param context the job context
+   * @param partitionInfo the partition info
+   * @param tableSchema the table level schema
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  private void initStorageDriver(HCatInputStorageDriver storageDriver,
+      JobContext context, PartInfo partitionInfo,
+      HCatSchema tableSchema) throws IOException {
+
+    storageDriver.setInputPath(context, partitionInfo.getLocation());
+
+    if( partitionInfo.getPartitionSchema() != null ) {
+      storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema());
+    }
+
+    storageDriver.setPartitionValues(context, partitionInfo.getPartitionValues());
+
+    //Set the output schema. Use the schema given by user if set, otherwise use the
+    //table level schema
+    HCatSchema outputSchema = null;
+    String outputSchemaString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+    if( outputSchemaString != null ) {
+      outputSchema = (HCatSchema) HCatUtil.deserialize(outputSchemaString);
+    } else {
+      outputSchema = tableSchema;
+    }
+
+    storageDriver.setOutputSchema(context, outputSchema);
+
+    storageDriver.initialize(context, partitionInfo.getInputStorageDriverProperties());
+  }
+
+  /**
+   * Gets the input driver instance.
+   * @param inputStorageDriverClass the input storage driver classname
+   * @return the input driver instance
+   * @throws Exception
+   */
+  @SuppressWarnings("unchecked")
+  private HCatInputStorageDriver getInputDriverInstance(
+      String inputStorageDriverClass) throws Exception {
+    try {
+      Class<? extends HCatInputStorageDriver> driverClass =
+        (Class<? extends HCatInputStorageDriver>)
+        Class.forName(inputStorageDriverClass);
+      return driverClass.newInstance();
+    } catch(Exception e) {
+      throw new Exception("error creating storage driver " +
+          inputStorageDriverClass, e);
+    }
+  }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The abstract class to be implemented by underlying storage drivers to enable data access from Howl through
+ *  HowlInputFormat.
+ */
+public abstract class HCatInputStorageDriver {
+
+  public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
+    // trivial do nothing
+  }
+
+  /**
+   * Returns the InputFormat to use with this Storage Driver.
+   * @param properties the properties containing parameters required for initialization of InputFormat
+   * @return the InputFormat instance
+   */
+  public abstract InputFormat<? extends WritableComparable, ? extends Writable> getInputFormat(Properties howlProperties);
+
+
+  /**
+   * Converts to HowlRecord format usable by HowlInputFormat to convert to required valuetype.
+   * Implementers of StorageDriver should look to overwriting this function so as to convert their
+   * value type to HowlRecord. Default implementation is provided for StorageDriver implementations
+   * on top of an underlying InputFormat that already uses HowlRecord as a tuple
+   * @param value the underlying value to convert to HowlRecord
+   */
+  public abstract HCatRecord convertToHCatRecord(WritableComparable baseKey, Writable baseValue) throws IOException;
+
+  /**
+   * Set the data location for the input.
+   * @param jobContext the job context object
+   * @param location the data location
+   * @throws IOException Signals that an I/O exception has occurred.
+   *
+   * Default implementation for FileInputFormat based Input Formats. Override
+   * this for other input formats.
+   */
+  public void setInputPath(JobContext jobContext, String location) throws IOException{
+
+    // ideally we should just call FileInputFormat.setInputPaths() here - but
+    // that won't work since FileInputFormat.setInputPaths() needs
+    // a Job object instead of a JobContext which we are handed here
+
+    int length = location.length();
+    int curlyOpen = 0;
+    int pathStart = 0;
+    boolean globPattern = false;
+    List<String> pathStrings = new ArrayList<String>();
+
+    for (int i=0; i<length; i++) {
+      char ch = location.charAt(i);
+      switch(ch) {
+      case '{' : {
+        curlyOpen++;
+        if (!globPattern) {
+          globPattern = true;
+        }
+        break;
+      }
+      case '}' : {
+        curlyOpen--;
+        if (curlyOpen == 0 && globPattern) {
+          globPattern = false;
+        }
+        break;
+      }
+      case ',' : {
+        if (!globPattern) {
+          pathStrings.add(location.substring(pathStart, i));
+          pathStart = i + 1 ;
+        }
+        break;
+      }
+      }
+    }
+    pathStrings.add(location.substring(pathStart, length));
+
+    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+
+    Configuration conf = jobContext.getConfiguration();
+
+    FileSystem fs = FileSystem.get(conf);
+    Path path = paths[0].makeQualified(fs);
+    StringBuilder str = new StringBuilder(StringUtils.escapeString(path.toString()));
+    for(int i = 1; i < paths.length;i++) {
+      str.append(StringUtils.COMMA_STR);
+      path = paths[i].makeQualified(fs);
+      str.append(StringUtils.escapeString(path.toString()));
+    }
+
+    conf.set("mapred.input.dir", str.toString());
+  }
+
+  /**
+   * Set the schema of the data as originally published in Howl. The storage driver might validate that this matches with
+   * the schema it has (like Zebra) or it will use this to create a HowlRecord matching the output schema.
+   * @param jobContext the job context object
+   * @param howlSchema the schema published in Howl for this data
+   * @param instantiationState
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  public abstract void setOriginalSchema(JobContext jobContext, HCatSchema howlSchema) throws IOException;
+
+  /**
+   * Set the consolidated schema for the HowlRecord data returned by the storage driver. All tuples returned by the RecordReader should
+   * have this schema. Nulls should be inserted for columns not present in the data.
+   * @param jobContext the job context object
+   * @param howlSchema the schema to use as the consolidated schema
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  public abstract void setOutputSchema(JobContext jobContext, HCatSchema howlSchema) throws IOException;
+
+  /**
+   * Sets the partition key values for the current partition. The storage driver is passed this so that the storage
+   * driver can add the partition key values to the output HowlRecord if the partition key values are not present on disk.
+   * @param jobContext the job context object
+   * @param partitionValues the partition values having a map with partition key name as key and the HowlKeyValue as value
+   * @param instantiationState
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  public abstract void setPartitionValues(JobContext jobContext, Map<String,String> partitionValues) throws IOException;
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.thrift.TException;
+
+public class HCatOutputCommitter extends OutputCommitter {
+
+    /** The underlying output committer */
+    private final OutputCommitter baseCommitter;
+
+    public HCatOutputCommitter(OutputCommitter baseCommitter) {
+        this.baseCommitter = baseCommitter;
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+        baseCommitter.abortTask(context);
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+        baseCommitter.commitTask(context);
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+        return baseCommitter.needsTaskCommit(context);
+    }
+
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      if( baseCommitter != null ) {
+        baseCommitter.setupJob(context);
+      }
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+        baseCommitter.setupTask(context);
+    }
+
+    @Override
+    public void abortJob(JobContext jobContext, State state) throws IOException {
+      if(baseCommitter != null) {
+        baseCommitter.abortJob(jobContext, state);
+      }
+      OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+
+      try {
+        HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(
+            jobInfo.getTableInfo().getServerUri(), jobContext.getConfiguration());
+        // cancel the deleg. tokens that were acquired for this job now that
+        // we are done - we should cancel if the tokens were acquired by
+        // HCatOutputFormat and not if they were supplied by Oozie. In the latter
+        // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+        String tokenStrForm = client.getTokenStrForm();
+        if(tokenStrForm != null && jobContext.getConfiguration().get
+            (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+          client.cancelDelegationToken(tokenStrForm);
+        }
+      } catch(Exception e) {
+        if( e instanceof HCatException ) {
+          throw (HCatException) e;
+        } else {
+          throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+        }
+      }
+
+      Path src = new Path(jobInfo.getLocation());
+      FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+      fs.delete(src, true);
+    }
+
+    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+    static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+      "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+    private static boolean getOutputDirMarking(Configuration conf) {
+      return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+                             false);
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      if(baseCommitter != null) {
+        baseCommitter.commitJob(jobContext);
+      }
+      // create _SUCCESS FILE if so requested.
+      OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+      if(getOutputDirMarking(jobContext.getConfiguration())) {
+        Path outputPath = new Path(jobInfo.getLocation());
+        if (outputPath != null) {
+          FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration());
+          // create a file in the folder to mark it
+          if (fileSys.exists(outputPath)) {
+            Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+            if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
+              fileSys.create(filePath).close();
+            }
+          }
+        }
+      }
+      cleanupJob(jobContext);
+    }
+
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+
+      OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+      Configuration conf = context.getConfiguration();
+      Table table = jobInfo.getTable();
+      StorageDescriptor tblSD = table.getSd();
+      Path tblPath = new Path(tblSD.getLocation());
+      FileSystem fs = tblPath.getFileSystem(conf);
+
+      if( table.getPartitionKeys().size() == 0 ) {
+        //non partitioned table
+
+        if( baseCommitter != null ) {
+          baseCommitter.cleanupJob(context);
+        }
+
+        //Move data from temp directory the actual table directory
+        //No metastore operation required.
+        Path src = new Path(jobInfo.getLocation());
+        moveTaskOutputs(fs, src, src, tblPath);
+        fs.delete(src, true);
+        return;
+      }
+
+      HiveMetaStoreClient client = null;
+      List<String> values = null;
+      boolean partitionAdded = false;
+      HCatTableInfo tableInfo = jobInfo.getTableInfo();
+
+      try {
+        client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf);
+
+        StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
+
+        Partition partition = new Partition();
+        partition.setDbName(tableInfo.getDatabaseName());
+        partition.setTableName(tableInfo.getTableName());
+        partition.setSd(new StorageDescriptor(tblSD));
+        partition.getSd().setLocation(jobInfo.getLocation());
+
+        updateTableSchema(client, table, jobInfo.getOutputSchema());
+
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        for(HCatFieldSchema fieldSchema : jobInfo.getOutputSchema().getFields()) {
+          fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema));
+        }
+
+        partition.getSd().setCols(fields);
+
+        Map<String,String> partKVs = tableInfo.getPartitionValues();
+        //Get partition value list
+        partition.setValues(getPartitionValueList(table,partKVs));
+
+        Map<String, String> params = new HashMap<String, String>();
+        params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass());
+        params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass());
+
+        //Copy table level hcat.* keys to the partition
+        for(Map.Entry<Object, Object> entry : storer.getProperties().entrySet()) {
+          params.put(entry.getKey().toString(), entry.getValue().toString());
+        }
+
+        partition.setParameters(params);
+
+        // Sets permissions and group name on partition dirs.
+        FileStatus tblStat = fs.getFileStatus(tblPath);
+        String grpName = tblStat.getGroup();
+        FsPermission perms = tblStat.getPermission();
+        Path partPath = tblPath;
+        for(FieldSchema partKey : table.getPartitionKeys()){
+          partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+          fs.setPermission(partPath, perms);
+          try{
+            fs.setOwner(partPath, null, grpName);
+          } catch(AccessControlException ace){
+            // log the messages before ignoring. Currently, logging is not built in Hcatalog.
+          }
+        }
+
+        //Publish the new partition
+        client.add_partition(partition);
+        partitionAdded = true; //publish to metastore done
+
+        if( baseCommitter != null ) {
+          baseCommitter.cleanupJob(context);
+        }
+        // cancel the deleg. tokens that were acquired for this job now that
+        // we are done - we should cancel if the tokens were acquired by
+        // HCatOutputFormat and not if they were supplied by Oozie. In the latter
+        // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+        String tokenStrForm = client.getTokenStrForm();
+        if(tokenStrForm != null && context.getConfiguration().get
+            (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+          client.cancelDelegationToken(tokenStrForm);
+        }
+      } catch (Exception e) {
+
+        if( partitionAdded ) {
+          try {
+            //baseCommitter.cleanupJob failed, try to clean up the metastore
+            client.dropPartition(tableInfo.getDatabaseName(),
+                    tableInfo.getTableName(), values);
+          } catch(Exception te) {
+            //Keep cause as the original exception
+            throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+          }
+        }
+
+        if( e instanceof HCatException ) {
+          throw (HCatException) e;
+        } else {
+          throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+        }
+      } finally {
+        if( client != null ) {
+          client.close();
+        }
+      }
+    }
+
+    private Path constructPartialPartPath(Path partialPath, String partKey, Map<String,String> partKVs){
+
+      StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey));
+      sb.append("=");
+      sb.append(FileUtils.escapePathName(partKVs.get(partKey)));
+      return new Path(partialPath, sb.toString());
+    }
+
+    /**
+     * Update table schema, adding new columns as added for the partition.
+     * @param client the client
+     * @param table the table
+     * @param partitionSchema the schema of the partition
+     * @throws IOException Signals that an I/O exception has occurred.
+     * @throws InvalidOperationException the invalid operation exception
+     * @throws MetaException the meta exception
+     * @throws TException the t exception
+     */
+    private void updateTableSchema(HiveMetaStoreClient client, Table table,
+        HCatSchema partitionSchema) throws IOException, InvalidOperationException, MetaException, TException {
+
+      List<FieldSchema> newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema);
+
+      if( newColumns.size() != 0 ) {
+        List<FieldSchema> tableColumns = new ArrayList<FieldSchema>(table.getSd().getCols());
+        tableColumns.addAll(newColumns);
+
+        //Update table schema to add the newly added columns
+        table.getSd().setCols(tableColumns);
+        client.alter_table(table.getDbName(), table.getTableName(), table);
+      }
+    }
+
+    /**
+     * Convert the partition value map to a value list in the partition key order.
+     * @param table the table being written to
+     * @param valueMap the partition value map
+     * @return the partition value list
+     * @throws IOException
+     */
+    static List<String> getPartitionValueList(Table table, Map<String, String> valueMap) throws IOException {
+
+      if( valueMap.size() != table.getPartitionKeys().size() ) {
+          throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
+              "Table "
+              + table.getTableName() + " has " +
+              table.getPartitionKeys().size() + " partition keys, got "+
+              valueMap.size());
+      }
+
+      List<String> values = new ArrayList<String>();
+
+      for(FieldSchema schema : table.getPartitionKeys()) {
+        String value = valueMap.get(schema.getName().toLowerCase());
+
+        if( value == null ) {
+          throw new HCatException(ErrorType.ERROR_MISSING_PARTITION_KEY,
+              "Key " + schema.getName() + " of table " + table.getTableName());
+        }
+
+        values.add(value);
+      }
+
+      return values;
+    }
+
+    /**
+     * Move all of the files from the temp directory to the final location
+     * @param fs the output file system
+     * @param file the file to move
+     * @param src the source directory
+     * @param dest the target directory
+     * @throws IOException
+     */
+    private void moveTaskOutputs(FileSystem fs,
+                                 Path file,
+                                 Path src,
+                                 Path dest) throws IOException {
+      if (fs.isFile(file)) {
+        Path finalOutputPath = getFinalPath(file, src, dest);
+
+        if (!fs.rename(file, finalOutputPath)) {
+          if (!fs.delete(finalOutputPath, true)) {
+            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
+          }
+          if (!fs.rename(file, finalOutputPath)) {
+            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+          }
+        }
+      } else if(fs.getFileStatus(file).isDir()) {
+        FileStatus[] paths = fs.listStatus(file);
+        Path finalOutputPath = getFinalPath(file, src, dest);
+        fs.mkdirs(finalOutputPath);
+
+        if (paths != null) {
+          for (FileStatus path : paths) {
+            moveTaskOutputs(fs, path.getPath(), src, dest);
+          }
+        }
+      }
+    }
+
+    /**
+     * Find the final name of a given output file, given the output directory
+     * and the work directory.
+     * @param file the file to move
+     * @param src the source directory
+     * @param dest the target directory
+     * @return the final path for the specific output file
+     * @throws IOException
+     */
+    private Path getFinalPath(Path file, Path src,
+                              Path dest) throws IOException {
+      URI taskOutputUri = file.toUri();
+      URI relativePath = src.toUri().relativize(taskOutputUri);
+      if (taskOutputUri == relativePath) {
+        throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Can not get the relative path: base = " +
+            src + " child = " + file);
+      }
+      if (relativePath.getPath().length() > 0) {
+        return new Path(dest, relativePath.getPath());
+      } else {
+        return dest;
+      }
+    }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.thrift.TException;
+
+/** The OutputFormat to use to write data to Howl. The key value is ignored and
+ * and should be given as null. The value is the HowlRecord to write.*/
+public class HCatOutputFormat extends OutputFormat<WritableComparable<?>, HCatRecord> {
+
+    /** The directory under which data is initially written for a non partitioned table */
+    protected static final String TEMP_DIR_NAME = "_TEMP";
+    private static Map<String, Token<DelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<DelegationTokenIdentifier>>();
+
+    private static final PathFilter hiddenFileFilter = new PathFilter(){
+      public boolean accept(Path p){
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    };
+
+    /**
+     * Set the info about the output to write for the Job. This queries the metadata server
+     * to find the StorageDriver to use for the table.  Throws error if partition is already published.
+     * @param job the job object
+     * @param outputInfo the table output info
+     * @throws IOException the exception in communicating with the metadata server
+     */
+    @SuppressWarnings("unchecked")
+    public static void setOutput(Job job, HCatTableInfo outputInfo) throws IOException {
+      HiveMetaStoreClient client = null;
+
+      try {
+
+	Configuration conf = job.getConfiguration();
+        client = createHiveClient(outputInfo.getServerUri(), conf);
+        Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName());
+
+        if( outputInfo.getPartitionValues() == null ) {
+          outputInfo.setPartitionValues(new HashMap<String, String>());
+        } else {
+          //Convert user specified map to have lower case key names
+          Map<String, String> valueMap = new HashMap<String, String>();
+          for(Map.Entry<String, String> entry : outputInfo.getPartitionValues().entrySet()) {
+            valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
+          }
+
+          outputInfo.setPartitionValues(valueMap);
+        }
+
+        //Handle duplicate publish
+        handleDuplicatePublish(job, outputInfo, client, table);
+
+        StorageDescriptor tblSD = table.getSd();
+        HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD);
+        StorerInfo storerInfo = InitializeInput.extractStorerInfo(tblSD,table.getParameters());
+
+        List<String> partitionCols = new ArrayList<String>();
+        for(FieldSchema schema : table.getPartitionKeys()) {
+          partitionCols.add(schema.getName());
+        }
+
+        Class<? extends HCatOutputStorageDriver> driverClass =
+          (Class<? extends HCatOutputStorageDriver>) Class.forName(storerInfo.getOutputSDClass());
+        HCatOutputStorageDriver driver = driverClass.newInstance();
+
+        String tblLocation = tblSD.getLocation();
+        String location = driver.getOutputLocation(job,
+            tblLocation, partitionCols,
+            outputInfo.getPartitionValues());
+
+        //Serialize the output info into the configuration
+        OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
+                tableSchema, tableSchema, storerInfo, location, table);
+        conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
+
+        Path tblPath = new Path(tblLocation);
+
+        /*  Set the umask in conf such that files/dirs get created with table-dir
+         * permissions. Following three assumptions are made:
+         * 1. Actual files/dirs creation is done by RecordWriter of underlying
+         * output format. It is assumed that they use default permissions while creation.
+         * 2. Default Permissions = FsPermission.getDefault() = 777.
+         * 3. UMask is honored by underlying filesystem.
+         */
+
+        FsPermission.setUMask(conf, FsPermission.getDefault().applyUMask(
+            tblPath.getFileSystem(conf).getFileStatus(tblPath).getPermission()));
+
+        if(UserGroupInformation.isSecurityEnabled()){
+          UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+          // check if oozie has set up a howl deleg. token - if so use it
+          TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
+          // TODO: will oozie use a "service" called "oozie" - then instead of
+          // new Text() do new Text("oozie") below - if this change is made also
+          // remember to do:
+          //  job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie");
+          // Also change code in HowlOutputCommitter.cleanupJob() to cancel the
+          // token only if token.service is not "oozie" - remove the condition of
+          // HCAT_KEY_TOKEN_SIGNATURE != null in that code.
+          Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+              new Text(), ugi.getTokens());
+          if(token != null) {
+
+            job.getCredentials().addToken(new Text(ugi.getUserName()),token);
+
+          } else {
+
+            // we did not get token set up by oozie, let's get them ourselves here.
+            // we essentially get a token per unique Output HowlTableInfo - this is
+            // done because through Pig, setOutput() method is called multiple times
+            // We want to only get the token once per unique output HowlTableInfo -
+            // we cannot just get one token since in multi-query case (> 1 store in 1 job)
+            // or the case when a single pig script results in > 1 jobs, the single
+            // token will get cancelled by the output committer and the subsequent
+            // stores will fail - by tying the token with the concatenation of
+            // dbname, tablename and partition keyvalues of the output
+            // TableInfo, we can have as many tokens as there are stores and the TokenSelector
+            // will correctly pick the right tokens which the committer will use and
+            // cancel.
+            String tokenSignature = getTokenSignature(outputInfo);
+            if(tokenMap.get(tokenSignature) == null) {
+              // get delegation tokens from howl server and store them into the "job"
+              // These will be used in the HowlOutputCommitter to publish partitions to
+              // howl
+              String tokenStrForm = client.getDelegationTokenWithSignature(ugi.getUserName(),
+                  tokenSignature);
+              Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
+              t.decodeFromUrlString(tokenStrForm);
+              tokenMap.put(tokenSignature, t);
+            }
+            job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature),
+                tokenMap.get(tokenSignature));
+            // this will be used by the outputcommitter to pass on to the metastore client
+            // which in turn will pass on to the TokenSelector so that it can select
+            // the right token.
+            job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
+        }
+       }
+      } catch(Exception e) {
+        if( e instanceof HCatException ) {
+          throw (HCatException) e;
+        } else {
+          throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
+        }
+      } finally {
+        if( client != null ) {
+          client.close();
+        }
+      }
+    }
+
+
+    // a signature string to associate with a HowlTableInfo - essentially
+    // a concatenation of dbname, tablename and partition keyvalues.
+    private static String getTokenSignature(HCatTableInfo outputInfo) {
+      StringBuilder result = new StringBuilder("");
+      String dbName = outputInfo.getDatabaseName();
+      if(dbName != null) {
+        result.append(dbName);
+      }
+      String tableName = outputInfo.getTableName();
+      if(tableName != null) {
+        result.append("+" + tableName);
+      }
+      Map<String, String> partValues = outputInfo.getPartitionValues();
+      if(partValues != null) {
+        for(Entry<String, String> entry: partValues.entrySet()) {
+          result.append("+" + entry.getKey() + "=" + entry.getValue());
+        }
+      }
+      return result.toString();
+    }
+
+
+
+    /**
+     * Handles duplicate publish of partition. Fails if partition already exists.
+     * For non partitioned tables, fails if files are present in table directory.
+     * @param job the job
+     * @param outputInfo the output info
+     * @param client the metastore client
+     * @param table the table being written to
+     * @throws IOException
+     * @throws MetaException
+     * @throws TException
+     */
+    private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo,
+        HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException {
+      List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
+                  table, outputInfo.getPartitionValues());
+
+      if( table.getPartitionKeys().size() > 0 ) {
+        //For partitioned table, fail if partition is already present
+        List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
+            outputInfo.getTableName(), partitionValues, (short) 1);
+
+        if( currentParts.size() > 0 ) {
+          throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
+        }
+      } else {
+        Path tablePath = new Path(table.getSd().getLocation());
+        FileSystem fs = tablePath.getFileSystem(job.getConfiguration());
+
+        if ( fs.exists(tablePath) ) {
+          FileStatus[] status = fs.globStatus(new Path(tablePath, "*"), hiddenFileFilter);
+
+          if( status.length > 0 ) {
+            throw new HCatException(ErrorType.ERROR_NON_EMPTY_TABLE,
+                      table.getDbName() + "." + table.getTableName());
+          }
+        }
+      }
+    }
+
+    /**
+     * Set the schema for the data being written out to the partition. The
+     * table schema is used by default for the partition if this is not called.
+     * @param job the job object
+     * @param schema the schema for the data
+     */
+    public static void setSchema(final Job job, final HCatSchema schema) throws IOException {
+
+        OutputJobInfo jobInfo = getJobInfo(job);
+        Map<String,String> partMap = jobInfo.getTableInfo().getPartitionValues();
+        List<Integer> posOfPartCols = new ArrayList<Integer>();
+
+        // If partition columns occur in data, we want to remove them.
+        // So, find out positions of partition columns in schema provided by user.
+        // We also need to update the output Schema with these deletions.
+
+        // Note that, output storage drivers never sees partition columns in data
+        // or schema.
+
+        HCatSchema schemaWithoutParts = new HCatSchema(schema.getFields());
+        for(String partKey : partMap.keySet()){
+          Integer idx;
+          if((idx = schema.getPosition(partKey)) != null){
+            posOfPartCols.add(idx);
+            schemaWithoutParts.remove(schema.get(partKey));
+          }
+        }
+        HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts);
+        jobInfo.setPosOfPartCols(posOfPartCols);
+        jobInfo.setOutputSchema(schemaWithoutParts);
+        job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
+    }
+
+    /**
+     * Gets the table schema for the table specified in the HowlOutputFormat.setOutput call
+     * on the specified job context.
+     * @param context the context
+     * @return the table schema
+     * @throws IOException if HowlOutputFromat.setOutput has not been called for the passed context
+     */
+    public static HCatSchema getTableSchema(JobContext context) throws IOException {
+        OutputJobInfo jobInfo = getJobInfo(context);
+        return jobInfo.getTableSchema();
+    }
+
+    /**
+     * Get the record writer for the job. Uses the Table's default OutputStorageDriver
+     * to get the record writer.
+     * @param context the information about the current task.
+     * @return a RecordWriter to write the output for the job.
+     * @throws IOException
+     */
+    @Override
+    public RecordWriter<WritableComparable<?>, HCatRecord>
+      getRecordWriter(TaskAttemptContext context
+                      ) throws IOException, InterruptedException {
+
+      // First create the RW.
+      HCatRecordWriter rw = new HCatRecordWriter(context);
+
+      // Now set permissions and group on freshly created files.
+      OutputJobInfo info =  getJobInfo(context);
+      Path workFile = rw.getStorageDriver().getWorkFilePath(context,info.getLocation());
+      Path tblPath = new Path(info.getTable().getSd().getLocation());
+      FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
+      FileStatus tblPathStat = fs.getFileStatus(tblPath);
+      fs.setPermission(workFile, tblPathStat.getPermission());
+      try{
+        fs.setOwner(workFile, null, tblPathStat.getGroup());
+      } catch(AccessControlException ace){
+        // log the messages before ignoring. Currently, logging is not built in Howl.
+      }
+      return rw;
+    }
+
+    /**
+     * Check for validity of the output-specification for the job.
+     * @param context information about the job
+     * @throws IOException when output should not be attempted
+     */
+    @Override
+    public void checkOutputSpecs(JobContext context
+                                          ) throws IOException, InterruptedException {
+        OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
+        outputFormat.checkOutputSpecs(context);
+    }
+
+    /**
+     * Get the output committer for this output format. This is responsible
+     * for ensuring the output is committed correctly.
+     * @param context the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context
+                                       ) throws IOException, InterruptedException {
+        OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
+        return new HCatOutputCommitter(outputFormat.getOutputCommitter(context));
+    }
+
+
+    /**
+     * Gets the output format instance.
+     * @param context the job context
+     * @return the output format instance
+     * @throws IOException
+     */
+    private OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat(JobContext context) throws IOException {
+        OutputJobInfo jobInfo = getJobInfo(context);
+        HCatOutputStorageDriver  driver = getOutputDriverInstance(context, jobInfo);
+
+        OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat =
+              driver.getOutputFormat();
+        return outputFormat;
+    }
+
+    /**
+     * Gets the HowlOuputJobInfo object by reading the Configuration and deserializing
+     * the string. If JobInfo is not present in the configuration, throws an
+     * exception since that means HowlOutputFormat.setOutput has not been called.
+     * @param jobContext the job context
+     * @return the OutputJobInfo object
+     * @throws IOException the IO exception
+     */
+    static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
+        String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        if( jobString == null ) {
+            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED);
+        }
+
+        return (OutputJobInfo) HCatUtil.deserialize(jobString);
+    }
+
+    /**
+     * Gets the output storage driver instance.
+     * @param jobContext the job context
+     * @param jobInfo the output job info
+     * @return the output driver instance
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    static HCatOutputStorageDriver getOutputDriverInstance(
+            JobContext jobContext, OutputJobInfo jobInfo) throws IOException {
+        try {
+            Class<? extends HCatOutputStorageDriver> driverClass =
+                (Class<? extends HCatOutputStorageDriver>)
+                Class.forName(jobInfo.getStorerInfo().getOutputSDClass());
+            HCatOutputStorageDriver driver = driverClass.newInstance();
+
+            //Initialize the storage driver
+            driver.setSchema(jobContext, jobInfo.getOutputSchema());
+            driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues());
+            driver.setOutputPath(jobContext, jobInfo.getLocation());
+
+            driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties());
+
+            return driver;
+        } catch(Exception e) {
+            throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e);
+        }
+    }
+
+    static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException {
+      HiveConf hiveConf = new HiveConf(HCatOutputFormat.class);
+
+      if( url != null ) {
+        //User specified a thrift url
+
+        hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
+        hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, conf.get(HCatConstants.HCAT_METASTORE_PRINCIPAL));
+
+        hiveConf.set("hive.metastore.local", "false");
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, url);
+        if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+          hiveConf.set("hive.metastore.token.signature", conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
+        }
+      } else {
+        //Thrift url is null, copy the hive conf into the job conf and restore it
+        //in the backend context
+
+        if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) {
+          conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(hiveConf.getAllProperties()));
+        } else {
+          //Copy configuration properties into the hive conf
+          Properties properties = (Properties) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+
+          for(Map.Entry<Object, Object> prop : properties.entrySet() ) {
+            if( prop.getValue() instanceof String ) {
+              hiveConf.set((String) prop.getKey(), (String) prop.getValue());
+            } else if( prop.getValue() instanceof Integer ) {
+              hiveConf.setInt((String) prop.getKey(), (Integer) prop.getValue());
+            } else if( prop.getValue() instanceof Boolean ) {
+              hiveConf.setBoolean((String) prop.getKey(), (Boolean) prop.getValue());
+            } else if( prop.getValue() instanceof Long ) {
+              hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue());
+            } else if( prop.getValue() instanceof Float ) {
+              hiveConf.setFloat((String) prop.getKey(), (Float) prop.getValue());
+            }
+          }
+        }
+
+      }
+
+      return new HiveMetaStoreClient(hiveConf);
+    }
+
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+
+/** The abstract class to be implemented by underlying storage drivers to enable data access from Howl through
+ *  HowlOutputFormat.
+ */
+public abstract class HCatOutputStorageDriver {
+
+  /**
+   * Initialize the storage driver with specified properties, default implementation does nothing.
+   * @param context the job context object
+   * @param howlProperties the properties for the storage driver
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+    public void initialize(JobContext context, Properties howlProperties) throws IOException {
+    }
+
+    /**
+     * Returns the OutputFormat to use with this Storage Driver.
+     * @return the OutputFormat instance
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public abstract OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException;
+
+    /**
+     * Set the data location for the output.
+     * @param jobContext the job context object
+     * @param location the data location
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public abstract void setOutputPath(JobContext jobContext, String location) throws IOException;
+
+    /**
+     * Set the schema for the data being written out.
+     * @param jobContext the job context object
+     * @param schema the data schema
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public abstract void setSchema(JobContext jobContext, HCatSchema schema) throws IOException;
+
+    /**
+     * Sets the partition key values for the partition being written.
+     * @param jobContext the job context object
+     * @param partitionValues the partition values
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public abstract void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues) throws IOException;
+
+    /**
+     * Generate the key for the underlying outputformat. The value given to HowlOutputFormat is passed as the
+     * argument. The key given to HowlOutputFormat is ignored..
+     * @param value the value given to HowlOutputFormat
+     * @return a key instance
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public abstract WritableComparable<?> generateKey(HCatRecord value) throws IOException;
+
+    /**
+     * Convert the given HowlRecord value to the actual value type.
+     * @param value the HowlRecord value to convert
+     * @return a value instance
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public abstract Writable convertValue(HCatRecord value) throws IOException;
+
+    /**
+     * Gets the location to use for the specified partition values.
+     *  The storage driver can override as required.
+     * @param jobContext the job context object
+     * @param tableLocation the location of the table
+     * @param partitionValues the partition values
+     * @return the location String.
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public String getOutputLocation(JobContext jobContext,
+            String tableLocation, List<String> partitionCols, Map<String, String> partitionValues) throws IOException {
+
+      if( partitionValues == null || partitionValues.size() == 0 ) {
+        return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString();
+      }
+
+      List<String> values = new ArrayList<String>();
+      for(String partitionCol : partitionCols) {
+        values.add(partitionValues.get(partitionCol));
+      }
+
+      String partitionLocation = FileUtils.makePartName(partitionCols, values);
+
+      Path path = new Path(tableLocation, partitionLocation);
+      return path.toString();
+    }
+
+    /** Default implementation assumes FileOutputFormat. Storage drivers wrapping
+     * other OutputFormats should override this method.
+     */
+    public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{
+      return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), FileOutputFormat.getUniqueFile(context, "part",""));
+    }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+/** The Howl wrapper for the underlying RecordReader, this ensures that the initialize on
+ * the underlying record reader is done with the underlying split, not with HowlSplit.
+ */
+class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> {
+
+    /** The underlying record reader to delegate to. */
+    private final RecordReader<? extends WritableComparable, ? extends Writable> baseRecordReader;
+
+    /** The storage driver used */
+    private final HCatInputStorageDriver storageDriver;
+
+    /**
+     * Instantiates a new howl record reader.
+     * @param baseRecordReader the base record reader
+     */
+    public HCatRecordReader(HCatInputStorageDriver storageDriver, RecordReader<? extends WritableComparable, ? extends Writable> baseRecordReader) {
+        this.baseRecordReader = baseRecordReader;
+        this.storageDriver = storageDriver;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
+     */
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext taskContext)
+    throws IOException, InterruptedException {
+        InputSplit baseSplit = split;
+
+        if( split instanceof HCatSplit ) {
+            baseSplit = ((HCatSplit) split).getBaseSplit();
+        }
+
+        baseRecordReader.initialize(baseSplit, taskContext);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
+     */
+    @Override
+    public WritableComparable getCurrentKey() throws IOException, InterruptedException {
+        return baseRecordReader.getCurrentKey();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
+     */
+    @Override
+    public HCatRecord getCurrentValue() throws IOException, InterruptedException {
+        return storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue());
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
+     */
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return baseRecordReader.getProgress();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
+     */
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        return baseRecordReader.nextKeyValue();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordReader#close()
+     */
+    @Override
+    public void close() throws IOException {
+        baseRecordReader.close();
+    }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+
+public class HCatRecordWriter extends RecordWriter<WritableComparable<?>, HCatRecord> {
+
+    private final HCatOutputStorageDriver storageDriver;
+    /**
+     * @return the storageDriver
+     */
+    public HCatOutputStorageDriver getStorageDriver() {
+      return storageDriver;
+    }
+
+    private final RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter;
+    private final List<Integer> partColsToDel;
+
+    public HCatRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+
+      OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+
+      // If partition columns occur in data, we want to remove them.
+      partColsToDel = jobInfo.getPosOfPartCols();
+
+      if(partColsToDel == null){
+        throw new HCatException("It seems that setSchema() is not called on " +
+        		"HowlOutputFormat. Please make sure that method is called.");
+      }
+
+      this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo);
+      this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+        baseWriter.close(context);
+    }
+
+    @Override
+    public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
+            InterruptedException {
+
+      for(Integer colToDel : partColsToDel){
+        value.remove(colToDel);
+      }
+        //The key given by user is ignored
+        WritableComparable<?> generatedKey = storageDriver.generateKey(value);
+        Writable convertedValue = storageDriver.convertValue(value);
+        baseWriter.write(generatedKey, convertedValue);
+    }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The HowlSplit wrapper around the InputSplit returned by the underlying InputFormat */
+class HCatSplit extends InputSplit implements Writable {
+
+    /** The partition info for the split. */
+    private PartInfo partitionInfo;
+
+    /** The split returned by the underlying InputFormat split. */
+    private InputSplit baseSplit;
+
+    /** The schema for the HowlTable */
+    private HCatSchema tableSchema;
+    /**
+     * Instantiates a new howl split.
+     */
+    public HCatSplit() {
+    }
+
+    /**
+     * Instantiates a new howl split.
+     *
+     * @param partitionInfo the partition info
+     * @param baseSplit the base split
+     * @param tableSchema the table level schema
+     */
+    public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) {
+        this.partitionInfo = partitionInfo;
+        this.baseSplit = baseSplit;
+        this.tableSchema = tableSchema;
+    }
+
+    /**
+     * Gets the partition info.
+     * @return the partitionInfo
+     */
+    public PartInfo getPartitionInfo() {
+        return partitionInfo;
+    }
+
+    /**
+     * Gets the underlying InputSplit.
+     * @return the baseSplit
+     */
+    public InputSplit getBaseSplit() {
+        return baseSplit;
+    }
+
+    /**
+     * Sets the table schema.
+     * @param tableSchema the new table schema
+     */
+    public void setTableSchema(HCatSchema tableSchema) {
+        this.tableSchema = tableSchema;
+    }
+
+    /**
+     * Gets the table schema.
+     * @return the table schema
+     */
+    public HCatSchema getTableSchema() {
+        return tableSchema;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
+     */
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+        return baseSplit.getLength();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
+     */
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+        return baseSplit.getLocations();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        String partitionInfoString = WritableUtils.readString(input);
+        partitionInfo = (PartInfo) HCatUtil.deserialize(partitionInfoString);
+
+        String baseSplitClassName = WritableUtils.readString(input);
+        InputSplit split;
+        try{
+            Class<? extends InputSplit> splitClass =
+                (Class<? extends InputSplit>) Class.forName(baseSplitClassName);
+
+            //Class.forName().newInstance() does not work if the underlying
+            //InputSplit has package visibility
+            Constructor<? extends InputSplit> constructor =
+                splitClass.getDeclaredConstructor(new Class[]{});
+            constructor.setAccessible(true);
+
+            split = constructor.newInstance();
+            // read baseSplit from input
+            ((Writable)split).readFields(input);
+            this.baseSplit = split;
+        }catch(Exception e){
+            throw new IOException ("Exception from " +baseSplitClassName + " : " + e.getMessage());
+        }
+
+        String tableSchemaString = WritableUtils.readString(input);
+        tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+     */
+    @Override
+    public void write(DataOutput output) throws IOException {
+        String partitionInfoString = HCatUtil.serialize(partitionInfo);
+
+        // write partitionInfo into output
+        WritableUtils.writeString(output, partitionInfoString);
+
+        WritableUtils.writeString(output, baseSplit.getClass().getName());
+        Writable baseSplitWritable = (Writable)baseSplit;
+        //write  baseSplit into output
+        baseSplitWritable.write(output);
+
+        //write the table schema into output
+        String tableSchemaString = HCatUtil.serialize(tableSchema);
+        WritableUtils.writeString(output, tableSchemaString);
+    }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+
+/**
+ *
+ * HCatTableInfo - class to communicate table information to {@link HowlInputFormat}
+ * and {@link HowlOutputFormat}
+ *
+ */
+public class HCatTableInfo implements Serializable {
+
+
+  private static final long serialVersionUID = 1L;
+
+  public enum TableInfoType {
+    INPUT_INFO,
+    OUTPUT_INFO
+  };
+
+  private final TableInfoType tableInfoType;
+
+  /** The Metadata server uri */
+  private final String serverUri;
+
+  /** If the howl server is configured to work with hadoop security, this
+   * variable will hold the principal name of the server - this will be used
+   * in the authentication to the howl server using kerberos
+   */
+  private final String serverKerberosPrincipal;
+
+  /** The db and table names */
+  private final String dbName;
+  private final String tableName;
+
+  /** The partition filter */
+  private String filter;
+
+  /** The partition predicates to filter on, an arbitrary AND/OR filter, if used to input from*/
+  private final String partitionPredicates;
+
+  /** The information about the partitions matching the specified query */
+  private JobInfo jobInfo;
+
+  /** The partition values to publish to, if used for output*/
+  private Map<String, String> partitionValues;
+
+  /**
+   * Initializes a new HCatTableInfo instance to be used with {@link HowlInputFormat}
+   * for reading data from a table.
+   * @param serverUri the Metadata server uri
+   * @param serverKerberosPrincipal If the howl server is configured to
+   * work with hadoop security, the kerberos principal name of the server - else null
+   * The principal name should be of the form:
+   * <servicename>/_HOST@<realm> like "howl/_HOST@myrealm.com"
+   * The special string _HOST will be replaced automatically with the correct host name
+   * @param dbName the db name
+   * @param tableName the table name
+   */
+  public static HCatTableInfo getInputTableInfo(String serverUri,
+      String serverKerberosPrincipal,
+      String dbName,
+          String tableName) {
+    return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, (String) null);
+  }
+
+  /**
+   * Initializes a new HCatTableInfo instance to be used with {@link HowlInputFormat}
+   * for reading data from a table.
+   * @param serverUri the Metadata server uri
+   * @param serverKerberosPrincipal If the howl server is configured to
+   * work with hadoop security, the kerberos principal name of the server - else null
+   * The principal name should be of the form:
+   * <servicename>/_HOST@<realm> like "howl/_HOST@myrealm.com"
+   * The special string _HOST will be replaced automatically with the correct host name
+   * @param dbName the db name
+   * @param tableName the table name
+   * @param filter the partition filter
+   */
+  public static HCatTableInfo getInputTableInfo(String serverUri, String serverKerberosPrincipal, String dbName,
+          String tableName, String filter) {
+    return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName, tableName, filter);
+  }
+
+  private HCatTableInfo(String serverUri, String serverKerberosPrincipal,
+      String dbName, String tableName, String filter) {
+      this.serverUri = serverUri;
+      this.serverKerberosPrincipal = serverKerberosPrincipal;
+      this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+      this.tableName = tableName;
+      this.partitionPredicates = null;
+      this.partitionValues = null;
+      this.tableInfoType = TableInfoType.INPUT_INFO;
+      this.filter = filter;
+  }
+  /**
+   * Initializes a new HCatTableInfo instance to be used with {@link HowlOutputFormat}
+   * for writing data from a table.
+   * @param serverUri the Metadata server uri
+   * @param serverKerberosPrincipal If the howl server is configured to
+   * work with hadoop security, the kerberos principal name of the server - else null
+   * The principal name should be of the form:
+   * <servicename>/_HOST@<realm> like "howl/_HOST@myrealm.com"
+   * The special string _HOST will be replaced automatically with the correct host name
+   * @param dbName the db name
+   * @param tableName the table name
+   * @param partitionValues The partition values to publish to, can be null or empty Map to
+   * indicate write to a unpartitioned table. For partitioned tables, this map should
+   * contain keys for all partition columns with corresponding values.
+   */
+  public static HCatTableInfo getOutputTableInfo(String serverUri,
+          String serverKerberosPrincipal, String dbName, String tableName, Map<String, String> partitionValues){
+      return new HCatTableInfo(serverUri, serverKerberosPrincipal, dbName,
+          tableName, partitionValues);
+  }
+
+  private HCatTableInfo(String serverUri, String serverKerberosPrincipal,
+      String dbName, String tableName, Map<String, String> partitionValues){
+    this.serverUri = serverUri;
+    this.serverKerberosPrincipal = serverKerberosPrincipal;
+    this.dbName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
+    this.tableName = tableName;
+    this.partitionPredicates = null;
+    this.partitionValues = partitionValues;
+    this.tableInfoType = TableInfoType.OUTPUT_INFO;
+  }
+
+  /**
+   * Gets the value of serverUri
+   * @return the serverUri
+   */
+  public String getServerUri() {
+    return serverUri;
+  }
+
+  /**
+   * Gets the value of dbName
+   * @return the dbName
+   */
+  public String getDatabaseName() {
+    return dbName;
+  }
+
+  /**
+   * Gets the value of tableName
+   * @return the tableName
+   */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /**
+   * Gets the value of partitionPredicates
+   * @return the partitionPredicates
+   */
+  public String getPartitionPredicates() {
+    return partitionPredicates;
+  }
+
+  /**
+   * Gets the value of partitionValues
+   * @return the partitionValues
+   */
+  public Map<String, String> getPartitionValues() {
+    return partitionValues;
+  }
+
+  /**
+   * Gets the value of job info
+   * @return the job info
+   */
+  public JobInfo getJobInfo() {
+    return jobInfo;
+  }
+
+  /**
+   * Sets the value of jobInfo
+   * @param jobInfo the jobInfo to set
+   */
+  public void setJobInfo(JobInfo jobInfo) {
+    this.jobInfo = jobInfo;
+  }
+
+  public TableInfoType getTableType(){
+    return this.tableInfoType;
+  }
+
+  /**
+   * Sets the value of partitionValues
+   * @param partitionValues the partition values to set
+   */
+  void setPartitionValues(Map<String, String>  partitionValues) {
+    this.partitionValues = partitionValues;
+  }
+
+  /**
+   * Gets the value of partition filter
+   * @return the filter string
+   */
+  public String getFilter() {
+    return filter;
+  }
+
+  /**
+   * @return the serverKerberosPrincipal
+   */
+  public String getServerKerberosPrincipal() {
+    return serverKerberosPrincipal;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 31*result + (serverUri == null ? 0 : serverUri.hashCode());
+    result = 31*result + (serverKerberosPrincipal == null ? 0 : serverKerberosPrincipal.hashCode());
+    result = 31*result + (dbName == null? 0 : dbName.hashCode());
+    result = 31*result + tableName.hashCode();
+    result = 31*result + (filter == null? 0 : filter.hashCode());
+    result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode());
+    result = 31*result + tableInfoType.ordinal();
+    result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode());
+    return result;
+
+  }
+}
+