You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 15:40:06 UTC

svn commit: r1522574 - in /hive/trunk: hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/results/positive/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hiv...

Author: hashutosh
Date: Thu Sep 12 13:40:06 2013
New Revision: 1522574

URL: http://svn.apache.org/r1522574
Log:
HIVE-5260 : Introduce HivePassThroughOutputFormat that allows Hive to use general purpose OutputFormats instead of HiveOutputFormats in StorageHandlers (Viraj Bhat via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
Modified:
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
    hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out
    hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Thu Sep 12 13:40:06 2013
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
@@ -34,7 +35,9 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -65,6 +69,11 @@ public class HBaseStorageHandler extends
 
   final static public String DEFAULT_PREFIX = "default.";
 
+  //Check if the configure job properties is called from input
+  // or output for setting asymmetric properties
+  private boolean configureInputJobProps = true;
+
+  private Configuration jobConf;
   private Configuration hbaseConf;
   private HBaseAdmin admin;
 
@@ -85,11 +94,16 @@ public class HBaseStorageHandler extends
     // for backwards compatibility with the original specs).
     String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
     if (tableName == null) {
+      //convert to lower case in case we are getting from serde
       tableName = tbl.getSd().getSerdeInfo().getParameters().get(
         HBaseSerDe.HBASE_TABLE_NAME);
+      //standardize to lower case
+      if (tableName != null) {
+        tableName = tableName.toLowerCase();
+      }
     }
     if (tableName == null) {
-      tableName = tbl.getDbName() + "." + tbl.getTableName();
+      tableName = (tbl.getDbName() + "." + tbl.getTableName()).toLowerCase();
       if (tableName.startsWith(DEFAULT_PREFIX)) {
         tableName = tableName.substring(DEFAULT_PREFIX.length());
       }
@@ -230,8 +244,13 @@ public class HBaseStorageHandler extends
     return hbaseConf;
   }
 
+  public Configuration getJobConf() {
+    return jobConf;
+  }
+
   @Override
   public void setConf(Configuration conf) {
+    jobConf = conf;
     hbaseConf = HBaseConfiguration.create(conf);
   }
 
@@ -242,7 +261,7 @@ public class HBaseStorageHandler extends
 
   @Override
   public Class<? extends OutputFormat> getOutputFormatClass() {
-    return HiveHBaseTableOutputFormat.class;
+    return org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat.class;
   }
 
   @Override
@@ -259,14 +278,18 @@ public class HBaseStorageHandler extends
   public void configureInputJobProperties(
     TableDesc tableDesc,
     Map<String, String> jobProperties) {
-      configureTableJobProperties(tableDesc, jobProperties);
+    //Input
+    this.configureInputJobProps = true;
+    configureTableJobProperties(tableDesc, jobProperties);
   }
 
   @Override
   public void configureOutputJobProperties(
     TableDesc tableDesc,
     Map<String, String> jobProperties) {
-      configureTableJobProperties(tableDesc, jobProperties);
+    //Output
+    this.configureInputJobProps = false;
+    configureTableJobProperties(tableDesc, jobProperties);
   }
 
   @Override
@@ -301,11 +324,59 @@ public class HBaseStorageHandler extends
     if (tableName == null) {
       tableName =
         tableProperties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
+        tableName = tableName.toLowerCase();
       if (tableName.startsWith(DEFAULT_PREFIX)) {
         tableName = tableName.substring(DEFAULT_PREFIX.length());
       }
     }
     jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
+
+    Configuration jobConf = getJobConf();
+    addHBaseResources(jobConf, jobProperties);
+
+    // do this for reconciling HBaseStorageHandler for use in HCatalog
+    // check to see if this an input job or an outputjob
+    if (this.configureInputJobProps) {
+      try {
+        HBaseConfiguration.addHbaseResources(jobConf);
+        addHBaseDelegationToken(jobConf);
+      }//try
+      catch (IOException e) {
+        throw new IllegalStateException("Error while configuring input job properties", e);
+      } //input job properties
+    }
+    else {
+      Configuration copyOfConf = new Configuration(jobConf);
+      HBaseConfiguration.addHbaseResources(copyOfConf);
+      jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName);
+    } // output job properties
+  }
+
+  /**
+   * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
+   * if they are not already present in the jobConf.
+   * @param jobConf Job configuration
+   * @param newJobProperties  Map to which new properties should be added
+   */
+  private void addHBaseResources(Configuration jobConf,
+      Map<String, String> newJobProperties) {
+    Configuration conf = new Configuration(false);
+    HBaseConfiguration.addHbaseResources(conf);
+    for (Entry<String, String> entry : conf) {
+      if (jobConf.get(entry.getKey()) == null) {
+        newJobProperties.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  private void addHBaseDelegationToken(Configuration conf) throws IOException {
+    if (User.isHBaseSecurityEnabled(conf)) {
+      try {
+        User.getCurrent().obtainAuthTokenForJob(conf,new Job(conf));
+      } catch (InterruptedException e) {
+        throw new IOException("Error while obtaining hbase delegation token", e);
+      }
+    }
   }
 
   @Override

Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Thu Sep 12 13:40:06 2013
@@ -19,27 +19,26 @@
 package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
 /**
@@ -49,7 +48,6 @@ import org.apache.hadoop.util.Progressab
  */
 public class HiveHBaseTableOutputFormat extends
     TableOutputFormat<ImmutableBytesWritable> implements
-    HiveOutputFormat<ImmutableBytesWritable, Put>,
     OutputFormat<ImmutableBytesWritable, Put> {
 
   static final Log LOG = LogFactory.getLog(HiveHBaseTableOutputFormat.class);
@@ -66,39 +64,7 @@ public class HiveHBaseTableOutputFormat 
    * @param progress progress used for status report
    * @return the RecordWriter for the output file
    */
-  @Override
-  public RecordWriter getHiveRecordWriter(
-      JobConf jc,
-      Path finalOutPath,
-      Class<? extends Writable> valueClass,
-      boolean isCompressed,
-      Properties tableProperties,
-      final Progressable progressable) throws IOException {
-
-    String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME);
-    jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
-    final boolean walEnabled = HiveConf.getBoolVar(
-        jc, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
-    final HTable table = new HTable(HBaseConfiguration.create(jc), hbaseTableName);
-    table.setAutoFlush(false);
 
-    return new RecordWriter() {
-
-      @Override
-      public void close(boolean abort) throws IOException {
-        if (!abort) {
-          table.flushCommits();
-        }
-      }
-
-      @Override
-      public void write(Writable w) throws IOException {
-        Put put = (Put) w;
-        put.setWriteToWAL(walEnabled);
-        table.put(put);
-      }
-    };
-  }
 
   @Override
   public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException {
@@ -127,6 +93,37 @@ public class HiveHBaseTableOutputFormat 
       String name,
       Progressable progressable) throws IOException {
 
-    throw new RuntimeException("Error: Hive should not invoke this method.");
+    String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
+    jobConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
+    final boolean walEnabled = HiveConf.getBoolVar(
+        jobConf, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
+    final HTable table = new HTable(HBaseConfiguration.create(jobConf), hbaseTableName);
+    table.setAutoFlush(false);
+    return new MyRecordWriter(table);
+  }
+
+  @Override
+   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+  InterruptedException {
+    return new TableOutputCommitter();
+}
+
+
+  static private class MyRecordWriter implements org.apache.hadoop.mapred.RecordWriter<ImmutableBytesWritable, Put> {
+    private final HTable m_table;
+
+    public MyRecordWriter(HTable table) {
+      m_table = table;
+    }
+
+    public void close(Reporter reporter)
+      throws IOException {
+      m_table.close();
+    }
+
+    public void write(ImmutableBytesWritable key,
+        Put value) throws IOException {
+      m_table.put(new Put(value));
+    }
   }
 }

Modified: hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out Thu Sep 12 13:40:06 2013
@@ -60,7 +60,7 @@ Table Parameters:	 	 
 # Storage Information	 	 
 SerDe Library:      	org.apache.hadoop.hive.hbase.HBaseSerDe	 
 InputFormat:        	org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat	 
-OutputFormat:       	org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat	 
+OutputFormat:       	org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat	 
 Compressed:         	No                  	 
 Num Buckets:        	-1                  	 
 Bucket Columns:     	[]                  	 

Modified: hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out Thu Sep 12 13:40:06 2013
@@ -60,7 +60,7 @@ Table Parameters:	 	 
 # Storage Information	 	 
 SerDe Library:      	org.apache.hadoop.hive.hbase.HBaseSerDe	 
 InputFormat:        	org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat	 
-OutputFormat:       	org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat	 
+OutputFormat:       	org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat	 
 Compressed:         	No                  	 
 Num Buckets:        	-1                  	 
 Bucket Columns:     	[]                  	 
@@ -231,7 +231,7 @@ Table Parameters:	 	 
 # Storage Information	 	 
 SerDe Library:      	org.apache.hadoop.hive.hbase.HBaseSerDe	 
 InputFormat:        	org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat	 
-OutputFormat:       	org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat	 
+OutputFormat:       	org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat	 
 Compressed:         	No                  	 
 Num Buckets:        	-1                  	 
 Bucket Columns:     	[]                  	 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Sep 12 13:40:06 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -926,7 +927,19 @@ public class FileSinkOperator extends Te
   public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
     if (hiveOutputFormat == null) {
       try {
-        hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+        if (getConf().getTableInfo().getJobProperties() != null) {
+             //Setting only for Storage Handler
+             if (getConf().getTableInfo().getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+                 job.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,getConf().getTableInfo().getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
+                 hiveOutputFormat = ReflectionUtils.newInstance(conf.getTableInfo().getOutputFileFormatClass(),job);
+           }
+          else {
+                 hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); 
+          }
+        }
+        else {
+              hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+        }
       } catch (Exception ex) {
         throw new IOException(ex);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Thu Sep 12 13:40:06 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -51,6 +53,7 @@ import org.apache.hadoop.mapred.Sequence
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * An util class for various Hive file format tasks.
@@ -70,6 +73,8 @@ public final class HiveFileFormatUtils {
         SequenceFileOutputFormat.class, HiveSequenceFileOutputFormat.class);
   }
 
+  static String realoutputFormat;
+
   @SuppressWarnings("unchecked")
   private static Map<Class<? extends OutputFormat>, Class<? extends HiveOutputFormat>>
   outputFormatSubstituteMap;
@@ -93,16 +98,45 @@ public final class HiveFileFormatUtils {
    */
   @SuppressWarnings("unchecked")
   public static synchronized Class<? extends HiveOutputFormat> getOutputFormatSubstitute(
-      Class<?> origin) {
+      Class<?> origin, boolean storagehandlerflag) {
     if (HiveOutputFormat.class.isAssignableFrom(origin)) {
       return (Class<? extends HiveOutputFormat>) origin;
     }
     Class<? extends HiveOutputFormat> result = outputFormatSubstituteMap
         .get(origin);
+    //register this output format into the map for the first time
+    if ((storagehandlerflag == true) && (result == null)) {
+      HiveFileFormatUtils.setRealOutputFormatClassName(origin.getName());
+      result = HivePassThroughOutputFormat.class;
+      HiveFileFormatUtils.registerOutputFormatSubstitute((Class<? extends OutputFormat>) origin,HivePassThroughOutputFormat.class);
+    }
     return result;
   }
 
   /**
+   * get a RealOutputFormatClassName corresponding to the HivePassThroughOutputFormat
+   */
+  @SuppressWarnings("unchecked")
+  public static String getRealOutputFormatClassName() 
+  {
+    return realoutputFormat;
+  }
+
+  /**
+   * set a RealOutputFormatClassName corresponding to the HivePassThroughOutputFormat
+   */
+  public static void setRealOutputFormatClassName(
+      String destination) {
+    if (destination != null){
+      realoutputFormat = destination;
+    }
+    else {
+      return;
+    }
+  }
+
+
+  /**
    * get the final output path of a given FileOutputFormat.
    *
    * @param parent
@@ -215,9 +249,21 @@ public final class HiveFileFormatUtils {
   public static RecordWriter getHiveRecordWriter(JobConf jc,
       TableDesc tableInfo, Class<? extends Writable> outputClass,
       FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
+    boolean storagehandlerofhivepassthru = false;
+    HiveOutputFormat<?, ?> hiveOutputFormat;
     try {
-      HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo
-          .getOutputFileFormatClass().newInstance();
+      if (tableInfo.getJobProperties() != null) {
+        if (tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+            jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
+            storagehandlerofhivepassthru  = true;
+         }
+      }
+      if (storagehandlerofhivepassthru) {
+         hiveOutputFormat = ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(),jc);
+      }
+      else {
+         hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
+      }
       boolean isCompressed = conf.getCompressed();
       JobConf jc_output = jc;
       if (isCompressed) {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java?rev=1522574&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java Thu Sep 12 13:40:06 2013
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ *  This pass through class is used to wrap OutputFormat implementations such that new OutputFormats not derived from
+ *  HiveOutputFormat gets through the checker
+ */
+
+public class HivePassThroughOutputFormat<K, V> implements Configurable, HiveOutputFormat<K, V>{
+
+  private OutputFormat<? super WritableComparable<?>, ? super Writable> actualOutputFormat;
+  private String actualOutputFormatClass = "";
+  private Configuration conf;
+  private boolean initialized;
+  public static final String HIVE_PASSTHROUGH_OF_CLASSNAME =
+                                  "org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat";
+
+  public static final String HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY =
+                                 "hive.passthrough.storagehandler.of"; 
+
+  public HivePassThroughOutputFormat() {
+    //construct this class through ReflectionUtils from FileSinkOperator
+    this.actualOutputFormat = null;
+    this.initialized = false;
+  }
+
+  private void createActualOF() throws IOException {
+    Class<? extends OutputFormat> cls;
+    try {
+      int e;
+      if (actualOutputFormatClass != null)
+       {
+        cls =
+           (Class<? extends OutputFormat>) Class.forName(actualOutputFormatClass, true,
+                JavaUtils.getClassLoader());
+      } else {
+        throw new RuntimeException("Null pointer detected in actualOutputFormatClass");
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+    OutputFormat<? super WritableComparable<?>, ? super Writable> actualOF =
+         (OutputFormat<? super WritableComparable, ? super Writable>)
+            ReflectionUtils.newInstance(cls, this.getConf());
+    this.actualOutputFormat = actualOF;
+  }
+
+  @Override
+  public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+    if (this.initialized == false) {
+      createActualOF();
+      this.initialized = true;
+    }
+   this.actualOutputFormat.checkOutputSpecs(ignored, job);
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.RecordWriter<K, V> getRecordWriter(FileSystem ignored,
+       JobConf job, String name, Progressable progress) throws IOException {
+    if (this.initialized == false) {
+      createActualOF();
+      this.initialized = true;
+    }
+    return (RecordWriter<K, V>) this.actualOutputFormat.getRecordWriter(ignored,
+                 job, name, progress);
+  }
+
+  @Override
+  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+      JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed,
+      Properties tableProperties, Progressable progress) throws IOException {
+    if (this.initialized == false) {
+      createActualOF();
+    }
+    if (this.actualOutputFormat instanceof HiveOutputFormat) {
+      return ((HiveOutputFormat<K, V>) this.actualOutputFormat).getHiveRecordWriter(jc,
+           finalOutPath, valueClass, isCompressed, tableProperties, progress);
+    }
+    else {
+      FileSystem fs = finalOutPath.getFileSystem(jc);
+      HivePassThroughRecordWriter hivepassthroughrecordwriter = new HivePassThroughRecordWriter(
+              this.actualOutputFormat.getRecordWriter(fs, jc, null, progress));
+      return hivepassthroughrecordwriter;
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration config) {
+    if (config.get(HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+      actualOutputFormatClass = config.get(HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY);
+    }
+    this.conf = config;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java?rev=1522574&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java Thu Sep 12 13:40:06 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V extends Writable>
+implements RecordWriter {
+
+  private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
+
+  public HivePassThroughRecordWriter(org.apache.hadoop.mapred.RecordWriter<K, V> writer) {
+    this.mWriter = writer;
+  }
+
+  @SuppressWarnings("unchecked")
+  public void write(Writable r) throws IOException {
+    mWriter.write(null, (V) r);
+  }
+
+  public void close(boolean abort) throws IOException {
+    //close with null reporter
+    mWriter.close(null);
+  }
+}
+

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Thu Sep 12 13:40:06 2013
@@ -301,7 +301,7 @@ public class Partition implements Serial
   public void setOutputFormatClass(Class<? extends HiveOutputFormat> outputFormatClass) {
     this.outputFormatClass = outputFormatClass;
     tPartition.getSd().setOutputFormat(HiveFileFormatUtils
-      .getOutputFormatSubstitute(outputFormatClass).toString());
+        .getOutputFormatSubstitute(outputFormatClass, false).toString());
   }
 
   final public Class<? extends InputFormat> getInputFormatClass()
@@ -339,7 +339,7 @@ public class Partition implements Serial
             JavaUtils.getClassLoader()));
         // Replace FileOutputFormat for backward compatibility
         if (!HiveOutputFormat.class.isAssignableFrom(c)) {
-          outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c);
+          outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c,false);
         } else {
           outputFormatClass = (Class<? extends HiveOutputFormat>)c;
         }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Thu Sep 12 13:40:06 2013
@@ -314,7 +314,7 @@ public class Table implements Serializab
 
   final public Class<? extends HiveOutputFormat> getOutputFormatClass() {
     // Replace FileOutputFormat for backward compatibility
-
+    boolean storagehandler = false;
     if (outputFormatClass == null) {
       try {
         String className = tTable.getSd().getOutputFormat();
@@ -329,7 +329,13 @@ public class Table implements Serializab
             JavaUtils.getClassLoader());
         }
         if (!HiveOutputFormat.class.isAssignableFrom(c)) {
-          outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c);
+          if (getStorageHandler() != null) {
+            storagehandler = true;
+          }
+          else {
+            storagehandler = false;
+          }
+          outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c,storagehandler);
         } else {
           outputFormatClass = (Class<? extends HiveOutputFormat>)c;
         }
@@ -672,7 +678,7 @@ public class Table implements Serializab
     try {
       Class<?> origin = Class.forName(name, true, JavaUtils.getClassLoader());
       setOutputFormatClass(HiveFileFormatUtils
-          .getOutputFormatSubstitute(origin));
+          .getOutputFormatSubstitute(origin,false));
     } catch (ClassNotFoundException e) {
       throw new HiveException("Class not found: " + name, e);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java Thu Sep 12 13:40:06 2013
@@ -403,7 +403,7 @@ public class CreateTableDesc extends DDL
         Class<?> origin = Class.forName(this.getOutputFormat(), true,
           JavaUtils.getClassLoader());
         Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils
-          .getOutputFormatSubstitute(origin);
+          .getOutputFormatSubstitute(origin,false);
         if (replaced == null) {
           throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE
             .getMsg());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java Thu Sep 12 13:40:06 2013
@@ -74,7 +74,7 @@ public class PartitionDesc implements Se
     this.inputFileFormatClass = inputFileFormatClass;
     if (outputFormat != null) {
       outputFileFormatClass = HiveFileFormatUtils
-          .getOutputFormatSubstitute(outputFormat);
+          .getOutputFormatSubstitute(outputFormat,false);
     }
     if (serdeClassName != null) {
       this.serdeClassName = serdeClassName;
@@ -177,7 +177,7 @@ public class PartitionDesc implements Se
 
   public void setOutputFileFormatClass(final Class<?> outputFileFormatClass) {
     this.outputFileFormatClass = HiveFileFormatUtils
-        .getOutputFormatSubstitute(outputFileFormatClass);
+        .getOutputFormatSubstitute(outputFileFormatClass,false);
   }
 
   @Explain(displayName = "properties", normalExplain = false)

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Thu Sep 12 13:40:06 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -36,7 +37,9 @@ import org.apache.hadoop.hive.ql.exec.Co
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
@@ -773,6 +776,10 @@ public final class PlanUtils {
         // for native tables, leave it null to avoid cluttering up
         // plans.
         if (!jobProperties.isEmpty()) {
+          if (tableDesc.getOutputFileFormatClass().getName() == HivePassThroughOutputFormat.HIVE_PASSTHROUGH_OF_CLASSNAME) {
+            // get the real output format when we register this for the table
+            jobProperties.put(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,HiveFileFormatUtils.getRealOutputFormatClassName());
+          }
           tableDesc.setJobProperties(jobProperties);
         }
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java Thu Sep 12 13:40:06 2013
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.mapred.InputFormat;
 
@@ -51,7 +52,7 @@ public class TableDesc implements Serial
     deserializerClass = serdeClass;
     this.inputFileFormatClass = inputFileFormatClass;
     outputFileFormatClass = HiveFileFormatUtils
-        .getOutputFormatSubstitute(class1);
+        .getOutputFormatSubstitute(class1, false);
     this.properties = properties;
     serdeClassName = properties
         .getProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB);
@@ -91,7 +92,7 @@ public class TableDesc implements Serial
 
   public void setOutputFileFormatClass(final Class<?> outputFileFormatClass) {
     this.outputFileFormatClass = HiveFileFormatUtils
-        .getOutputFormatSubstitute(outputFileFormatClass);
+        .getOutputFormatSubstitute(outputFileFormatClass, false);
   }
 
   @Explain(displayName = "properties", normalExplain = false)
@@ -141,7 +142,12 @@ public class TableDesc implements Serial
 
   @Explain(displayName = "output format")
   public String getOutputFileFormatClassName() {
-    return getOutputFileFormatClass().getName();
+    if (getOutputFileFormatClass().getName() == HivePassThroughOutputFormat.HIVE_PASSTHROUGH_OF_CLASSNAME) {
+      return HiveFileFormatUtils.getRealOutputFormatClassName();
+    }
+    else {
+      return getOutputFileFormatClass().getName();
+    }
   }
 
   public boolean isNonNative() {