You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 15:40:06 UTC
svn commit: r1522574 - in /hive/trunk:
hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/results/positive/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hiv...
Author: hashutosh
Date: Thu Sep 12 13:40:06 2013
New Revision: 1522574
URL: http://svn.apache.org/r1522574
Log:
HIVE-5260 : Introduce HivePassThroughOutputFormat that allows Hive to use general purpose OutputFormats instead of HiveOutputFormats in StorageHandlers (Viraj Bhat via Ashutosh Chauhan)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out
hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Thu Sep 12 13:40:06 2013
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
@@ -34,7 +35,9 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
/**
@@ -65,6 +69,11 @@ public class HBaseStorageHandler extends
final static public String DEFAULT_PREFIX = "default.";
+ //Check if the configure job properties is called from input
+ // or output for setting asymmetric properties
+ private boolean configureInputJobProps = true;
+
+ private Configuration jobConf;
private Configuration hbaseConf;
private HBaseAdmin admin;
@@ -85,11 +94,16 @@ public class HBaseStorageHandler extends
// for backwards compatibility with the original specs).
String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
if (tableName == null) {
+ //convert to lower case in case we are getting from serde
tableName = tbl.getSd().getSerdeInfo().getParameters().get(
HBaseSerDe.HBASE_TABLE_NAME);
+ //standardize to lower case
+ if (tableName != null) {
+ tableName = tableName.toLowerCase();
+ }
}
if (tableName == null) {
- tableName = tbl.getDbName() + "." + tbl.getTableName();
+ tableName = (tbl.getDbName() + "." + tbl.getTableName()).toLowerCase();
if (tableName.startsWith(DEFAULT_PREFIX)) {
tableName = tableName.substring(DEFAULT_PREFIX.length());
}
@@ -230,8 +244,13 @@ public class HBaseStorageHandler extends
return hbaseConf;
}
+ public Configuration getJobConf() {
+ return jobConf;
+ }
+
@Override
public void setConf(Configuration conf) {
+ jobConf = conf;
hbaseConf = HBaseConfiguration.create(conf);
}
@@ -242,7 +261,7 @@ public class HBaseStorageHandler extends
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
- return HiveHBaseTableOutputFormat.class;
+ return org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat.class;
}
@Override
@@ -259,14 +278,18 @@ public class HBaseStorageHandler extends
public void configureInputJobProperties(
TableDesc tableDesc,
Map<String, String> jobProperties) {
- configureTableJobProperties(tableDesc, jobProperties);
+ //Input
+ this.configureInputJobProps = true;
+ configureTableJobProperties(tableDesc, jobProperties);
}
@Override
public void configureOutputJobProperties(
TableDesc tableDesc,
Map<String, String> jobProperties) {
- configureTableJobProperties(tableDesc, jobProperties);
+ //Output
+ this.configureInputJobProps = false;
+ configureTableJobProperties(tableDesc, jobProperties);
}
@Override
@@ -301,11 +324,59 @@ public class HBaseStorageHandler extends
if (tableName == null) {
tableName =
tableProperties.getProperty(hive_metastoreConstants.META_TABLE_NAME);
+ tableName = tableName.toLowerCase();
if (tableName.startsWith(DEFAULT_PREFIX)) {
tableName = tableName.substring(DEFAULT_PREFIX.length());
}
}
jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
+
+ Configuration jobConf = getJobConf();
+ addHBaseResources(jobConf, jobProperties);
+
+ // do this for reconciling HBaseStorageHandler for use in HCatalog
+ // check to see if this an input job or an outputjob
+ if (this.configureInputJobProps) {
+ try {
+ HBaseConfiguration.addHbaseResources(jobConf);
+ addHBaseDelegationToken(jobConf);
+ }//try
+ catch (IOException e) {
+ throw new IllegalStateException("Error while configuring input job properties", e);
+ } //input job properties
+ }
+ else {
+ Configuration copyOfConf = new Configuration(jobConf);
+ HBaseConfiguration.addHbaseResources(copyOfConf);
+ jobProperties.put(TableOutputFormat.OUTPUT_TABLE, tableName);
+ } // output job properties
+ }
+
+ /**
+ * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
+ * if they are not already present in the jobConf.
+ * @param jobConf Job configuration
+ * @param newJobProperties Map to which new properties should be added
+ */
+ private void addHBaseResources(Configuration jobConf,
+ Map<String, String> newJobProperties) {
+ Configuration conf = new Configuration(false);
+ HBaseConfiguration.addHbaseResources(conf);
+ for (Entry<String, String> entry : conf) {
+ if (jobConf.get(entry.getKey()) == null) {
+ newJobProperties.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ private void addHBaseDelegationToken(Configuration conf) throws IOException {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ try {
+ User.getCurrent().obtainAuthTokenForJob(conf,new Job(conf));
+ } catch (InterruptedException e) {
+ throw new IOException("Error while obtaining hbase delegation token", e);
+ }
+ }
}
@Override
Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (original)
+++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Thu Sep 12 13:40:06 2013
@@ -19,27 +19,26 @@
package org.apache.hadoop.hive.hbase;
import java.io.IOException;
-import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
/**
@@ -49,7 +48,6 @@ import org.apache.hadoop.util.Progressab
*/
public class HiveHBaseTableOutputFormat extends
TableOutputFormat<ImmutableBytesWritable> implements
- HiveOutputFormat<ImmutableBytesWritable, Put>,
OutputFormat<ImmutableBytesWritable, Put> {
static final Log LOG = LogFactory.getLog(HiveHBaseTableOutputFormat.class);
@@ -66,39 +64,7 @@ public class HiveHBaseTableOutputFormat
* @param progress progress used for status report
* @return the RecordWriter for the output file
*/
- @Override
- public RecordWriter getHiveRecordWriter(
- JobConf jc,
- Path finalOutPath,
- Class<? extends Writable> valueClass,
- boolean isCompressed,
- Properties tableProperties,
- final Progressable progressable) throws IOException {
-
- String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME);
- jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
- final boolean walEnabled = HiveConf.getBoolVar(
- jc, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
- final HTable table = new HTable(HBaseConfiguration.create(jc), hbaseTableName);
- table.setAutoFlush(false);
- return new RecordWriter() {
-
- @Override
- public void close(boolean abort) throws IOException {
- if (!abort) {
- table.flushCommits();
- }
- }
-
- @Override
- public void write(Writable w) throws IOException {
- Put put = (Put) w;
- put.setWriteToWAL(walEnabled);
- table.put(put);
- }
- };
- }
@Override
public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException {
@@ -127,6 +93,37 @@ public class HiveHBaseTableOutputFormat
String name,
Progressable progressable) throws IOException {
- throw new RuntimeException("Error: Hive should not invoke this method.");
+ String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
+ jobConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
+ final boolean walEnabled = HiveConf.getBoolVar(
+ jobConf, HiveConf.ConfVars.HIVE_HBASE_WAL_ENABLED);
+ final HTable table = new HTable(HBaseConfiguration.create(jobConf), hbaseTableName);
+ table.setAutoFlush(false);
+ return new MyRecordWriter(table);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new TableOutputCommitter();
+}
+
+
+ static private class MyRecordWriter implements org.apache.hadoop.mapred.RecordWriter<ImmutableBytesWritable, Put> {
+ private final HTable m_table;
+
+ public MyRecordWriter(HTable table) {
+ m_table = table;
+ }
+
+ public void close(Reporter reporter)
+ throws IOException {
+ m_table.close();
+ }
+
+ public void write(ImmutableBytesWritable key,
+ Put value) throws IOException {
+ m_table.put(new Put(value));
+ }
}
}
Modified: hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/external_table_ppd.q.out Thu Sep 12 13:40:06 2013
@@ -60,7 +60,7 @@ Table Parameters:
# Storage Information
SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe
InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
-OutputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat
+OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Modified: hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out (original)
+++ hive/trunk/hbase-handler/src/test/results/positive/hbase_binary_storage_queries.q.out Thu Sep 12 13:40:06 2013
@@ -60,7 +60,7 @@ Table Parameters:
# Storage Information
SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe
InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
-OutputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat
+OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
@@ -231,7 +231,7 @@ Table Parameters:
# Storage Information
SerDe Library: org.apache.hadoop.hive.hbase.HBaseSerDe
InputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat
-OutputFormat: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat
+OutputFormat: org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Sep 12 13:40:06 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePartitioner;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -926,7 +927,19 @@ public class FileSinkOperator extends Te
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
if (hiveOutputFormat == null) {
try {
- hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ if (getConf().getTableInfo().getJobProperties() != null) {
+ //Setting only for Storage Handler
+ if (getConf().getTableInfo().getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+ job.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,getConf().getTableInfo().getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
+ hiveOutputFormat = ReflectionUtils.newInstance(conf.getTableInfo().getOutputFileFormatClass(),job);
+ }
+ else {
+ hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ }
+ }
+ else {
+ hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ }
} catch (Exception ex) {
throw new IOException(ex);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Thu Sep 12 13:40:06 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -51,6 +53,7 @@ import org.apache.hadoop.mapred.Sequence
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* An util class for various Hive file format tasks.
@@ -70,6 +73,8 @@ public final class HiveFileFormatUtils {
SequenceFileOutputFormat.class, HiveSequenceFileOutputFormat.class);
}
+ static String realoutputFormat;
+
@SuppressWarnings("unchecked")
private static Map<Class<? extends OutputFormat>, Class<? extends HiveOutputFormat>>
outputFormatSubstituteMap;
@@ -93,16 +98,45 @@ public final class HiveFileFormatUtils {
*/
@SuppressWarnings("unchecked")
public static synchronized Class<? extends HiveOutputFormat> getOutputFormatSubstitute(
- Class<?> origin) {
+ Class<?> origin, boolean storagehandlerflag) {
if (HiveOutputFormat.class.isAssignableFrom(origin)) {
return (Class<? extends HiveOutputFormat>) origin;
}
Class<? extends HiveOutputFormat> result = outputFormatSubstituteMap
.get(origin);
+ //register this output format into the map for the first time
+ if ((storagehandlerflag == true) && (result == null)) {
+ HiveFileFormatUtils.setRealOutputFormatClassName(origin.getName());
+ result = HivePassThroughOutputFormat.class;
+ HiveFileFormatUtils.registerOutputFormatSubstitute((Class<? extends OutputFormat>) origin,HivePassThroughOutputFormat.class);
+ }
return result;
}
/**
+ * get a RealOutputFormatClassName corresponding to the HivePassThroughOutputFormat
+ */
+ @SuppressWarnings("unchecked")
+ public static String getRealOutputFormatClassName()
+ {
+ return realoutputFormat;
+ }
+
+ /**
+ * set a RealOutputFormatClassName corresponding to the HivePassThroughOutputFormat
+ */
+ public static void setRealOutputFormatClassName(
+ String destination) {
+ if (destination != null){
+ realoutputFormat = destination;
+ }
+ else {
+ return;
+ }
+ }
+
+
+ /**
* get the final output path of a given FileOutputFormat.
*
* @param parent
@@ -215,9 +249,21 @@ public final class HiveFileFormatUtils {
public static RecordWriter getHiveRecordWriter(JobConf jc,
TableDesc tableInfo, Class<? extends Writable> outputClass,
FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
+ boolean storagehandlerofhivepassthru = false;
+ HiveOutputFormat<?, ?> hiveOutputFormat;
try {
- HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo
- .getOutputFileFormatClass().newInstance();
+ if (tableInfo.getJobProperties() != null) {
+ if (tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+ jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
+ storagehandlerofhivepassthru = true;
+ }
+ }
+ if (storagehandlerofhivepassthru) {
+ hiveOutputFormat = ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(),jc);
+ }
+ else {
+ hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
+ }
boolean isCompressed = conf.getCompressed();
JobConf jc_output = jc;
if (isCompressed) {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java?rev=1522574&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java Thu Sep 12 13:40:06 2013
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This pass through class is used to wrap OutputFormat implementations such that new OutputFormats not derived from
+ * HiveOutputFormat gets through the checker
+ */
+
+public class HivePassThroughOutputFormat<K, V> implements Configurable, HiveOutputFormat<K, V>{
+
+ private OutputFormat<? super WritableComparable<?>, ? super Writable> actualOutputFormat;
+ private String actualOutputFormatClass = "";
+ private Configuration conf;
+ private boolean initialized;
+ public static final String HIVE_PASSTHROUGH_OF_CLASSNAME =
+ "org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat";
+
+ public static final String HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY =
+ "hive.passthrough.storagehandler.of";
+
+ public HivePassThroughOutputFormat() {
+ //construct this class through ReflectionUtils from FileSinkOperator
+ this.actualOutputFormat = null;
+ this.initialized = false;
+ }
+
+ private void createActualOF() throws IOException {
+ Class<? extends OutputFormat> cls;
+ try {
+ int e;
+ if (actualOutputFormatClass != null)
+ {
+ cls =
+ (Class<? extends OutputFormat>) Class.forName(actualOutputFormatClass, true,
+ JavaUtils.getClassLoader());
+ } else {
+ throw new RuntimeException("Null pointer detected in actualOutputFormatClass");
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ OutputFormat<? super WritableComparable<?>, ? super Writable> actualOF =
+ (OutputFormat<? super WritableComparable, ? super Writable>)
+ ReflectionUtils.newInstance(cls, this.getConf());
+ this.actualOutputFormat = actualOF;
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ if (this.initialized == false) {
+ createActualOF();
+ this.initialized = true;
+ }
+ this.actualOutputFormat.checkOutputSpecs(ignored, job);
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordWriter<K, V> getRecordWriter(FileSystem ignored,
+ JobConf job, String name, Progressable progress) throws IOException {
+ if (this.initialized == false) {
+ createActualOF();
+ this.initialized = true;
+ }
+ return (RecordWriter<K, V>) this.actualOutputFormat.getRecordWriter(ignored,
+ job, name, progress);
+ }
+
+ @Override
+ public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+ JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed,
+ Properties tableProperties, Progressable progress) throws IOException {
+ if (this.initialized == false) {
+ createActualOF();
+ }
+ if (this.actualOutputFormat instanceof HiveOutputFormat) {
+ return ((HiveOutputFormat<K, V>) this.actualOutputFormat).getHiveRecordWriter(jc,
+ finalOutPath, valueClass, isCompressed, tableProperties, progress);
+ }
+ else {
+ FileSystem fs = finalOutPath.getFileSystem(jc);
+ HivePassThroughRecordWriter hivepassthroughrecordwriter = new HivePassThroughRecordWriter(
+ this.actualOutputFormat.getRecordWriter(fs, jc, null, progress));
+ return hivepassthroughrecordwriter;
+ }
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration config) {
+ if (config.get(HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+ actualOutputFormatClass = config.get(HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY);
+ }
+ this.conf = config;
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java?rev=1522574&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java Thu Sep 12 13:40:06 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V extends Writable>
+implements RecordWriter {
+
+ private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
+
+ public HivePassThroughRecordWriter(org.apache.hadoop.mapred.RecordWriter<K, V> writer) {
+ this.mWriter = writer;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void write(Writable r) throws IOException {
+ mWriter.write(null, (V) r);
+ }
+
+ public void close(boolean abort) throws IOException {
+ //close with null reporter
+ mWriter.close(null);
+ }
+}
+
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Thu Sep 12 13:40:06 2013
@@ -301,7 +301,7 @@ public class Partition implements Serial
public void setOutputFormatClass(Class<? extends HiveOutputFormat> outputFormatClass) {
this.outputFormatClass = outputFormatClass;
tPartition.getSd().setOutputFormat(HiveFileFormatUtils
- .getOutputFormatSubstitute(outputFormatClass).toString());
+ .getOutputFormatSubstitute(outputFormatClass, false).toString());
}
final public Class<? extends InputFormat> getInputFormatClass()
@@ -339,7 +339,7 @@ public class Partition implements Serial
JavaUtils.getClassLoader()));
// Replace FileOutputFormat for backward compatibility
if (!HiveOutputFormat.class.isAssignableFrom(c)) {
- outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c);
+ outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c,false);
} else {
outputFormatClass = (Class<? extends HiveOutputFormat>)c;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Thu Sep 12 13:40:06 2013
@@ -314,7 +314,7 @@ public class Table implements Serializab
final public Class<? extends HiveOutputFormat> getOutputFormatClass() {
// Replace FileOutputFormat for backward compatibility
-
+ boolean storagehandler = false;
if (outputFormatClass == null) {
try {
String className = tTable.getSd().getOutputFormat();
@@ -329,7 +329,13 @@ public class Table implements Serializab
JavaUtils.getClassLoader());
}
if (!HiveOutputFormat.class.isAssignableFrom(c)) {
- outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c);
+ if (getStorageHandler() != null) {
+ storagehandler = true;
+ }
+ else {
+ storagehandler = false;
+ }
+ outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(c,storagehandler);
} else {
outputFormatClass = (Class<? extends HiveOutputFormat>)c;
}
@@ -672,7 +678,7 @@ public class Table implements Serializab
try {
Class<?> origin = Class.forName(name, true, JavaUtils.getClassLoader());
setOutputFormatClass(HiveFileFormatUtils
- .getOutputFormatSubstitute(origin));
+ .getOutputFormatSubstitute(origin,false));
} catch (ClassNotFoundException e) {
throw new HiveException("Class not found: " + name, e);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java Thu Sep 12 13:40:06 2013
@@ -403,7 +403,7 @@ public class CreateTableDesc extends DDL
Class<?> origin = Class.forName(this.getOutputFormat(), true,
JavaUtils.getClassLoader());
Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils
- .getOutputFormatSubstitute(origin);
+ .getOutputFormatSubstitute(origin,false);
if (replaced == null) {
throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE
.getMsg());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java Thu Sep 12 13:40:06 2013
@@ -74,7 +74,7 @@ public class PartitionDesc implements Se
this.inputFileFormatClass = inputFileFormatClass;
if (outputFormat != null) {
outputFileFormatClass = HiveFileFormatUtils
- .getOutputFormatSubstitute(outputFormat);
+ .getOutputFormatSubstitute(outputFormat,false);
}
if (serdeClassName != null) {
this.serdeClassName = serdeClassName;
@@ -177,7 +177,7 @@ public class PartitionDesc implements Se
public void setOutputFileFormatClass(final Class<?> outputFileFormatClass) {
this.outputFileFormatClass = HiveFileFormatUtils
- .getOutputFormatSubstitute(outputFileFormatClass);
+ .getOutputFormatSubstitute(outputFileFormatClass,false);
}
@Explain(displayName = "properties", normalExplain = false)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Thu Sep 12 13:40:06 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.plan;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -36,7 +37,9 @@ import org.apache.hadoop.hive.ql.exec.Co
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
@@ -773,6 +776,10 @@ public final class PlanUtils {
// for native tables, leave it null to avoid cluttering up
// plans.
if (!jobProperties.isEmpty()) {
+ if (tableDesc.getOutputFileFormatClass().getName() == HivePassThroughOutputFormat.HIVE_PASSTHROUGH_OF_CLASSNAME) {
+ // get the real output format when we register this for the table
+ jobProperties.put(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,HiveFileFormatUtils.getRealOutputFormatClassName());
+ }
tableDesc.setJobProperties(jobProperties);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java?rev=1522574&r1=1522573&r2=1522574&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java Thu Sep 12 13:40:06 2013
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.mapred.InputFormat;
@@ -51,7 +52,7 @@ public class TableDesc implements Serial
deserializerClass = serdeClass;
this.inputFileFormatClass = inputFileFormatClass;
outputFileFormatClass = HiveFileFormatUtils
- .getOutputFormatSubstitute(class1);
+ .getOutputFormatSubstitute(class1, false);
this.properties = properties;
serdeClassName = properties
.getProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB);
@@ -91,7 +92,7 @@ public class TableDesc implements Serial
public void setOutputFileFormatClass(final Class<?> outputFileFormatClass) {
this.outputFileFormatClass = HiveFileFormatUtils
- .getOutputFormatSubstitute(outputFileFormatClass);
+ .getOutputFormatSubstitute(outputFileFormatClass, false);
}
@Explain(displayName = "properties", normalExplain = false)
@@ -141,7 +142,12 @@ public class TableDesc implements Serial
@Explain(displayName = "output format")
public String getOutputFileFormatClassName() {
- return getOutputFileFormatClass().getName();
+ if (getOutputFileFormatClass().getName() == HivePassThroughOutputFormat.HIVE_PASSTHROUGH_OF_CLASSNAME) {
+ return HiveFileFormatUtils.getRealOutputFormatClassName();
+ }
+ else {
+ return getOutputFileFormatClass().getName();
+ }
}
public boolean isNonNative() {