You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/04/17 16:31:04 UTC
svn commit: r1327170 - in /incubator/hcatalog/branches/branch-0.4: ./
CHANGES.txt src/java/org/apache/hcatalog/common/HCatConstants.java
src/java/org/apache/hcatalog/pig/HCatLoader.java
src/java/org/apache/hcatalog/pig/HCatStorer.java
Author: toffer
Date: Tue Apr 17 16:31:03 2012
New Revision: 1327170
URL: http://svn.apache.org/viewvc?rev=1327170&view=rev
Log:
merged from trunk: HCATALOG-314 HCatOutputFormat.setOutput is called more than once by HCatStorer (avandana via toffer)
Modified:
incubator/hcatalog/branches/branch-0.4/ (props changed)
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
Propchange: incubator/hcatalog/branches/branch-0.4/
------------------------------------------------------------------------------
Merged /incubator/hcatalog/trunk:r1327167
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1327170&r1=1327169&r2=1327170&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Tue Apr 17 16:31:03 2012
@@ -108,6 +108,8 @@ Release 0.4.0 - Unreleased
OPTIMIZATIONS
BUG FIXES
+ HCAT-314 HCatOutputFormat.setOutput is called more than once by HCatStorer (avandana via toffer)
+
HCAT-378 Found a few source files missing Apache headers (gates)
HCAT-377 Recent changes broke releaseaudit target (gates)
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1327170&r1=1327169&r2=1327170&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java Tue Apr 17 16:31:03 2012
@@ -28,15 +28,17 @@ public final class HCatConstants {
public static final String SEQUENCEFILE_INPUT = SequenceFileInputFormat.class.getName();
public static final String SEQUENCEFILE_OUTPUT = SequenceFileOutputFormat.class.getName();
-
+
public static final String HCAT_PIG_STORAGE_CLASS = "org.apache.pig.builtin.PigStorage";
public static final String HCAT_PIG_LOADER = "hcat.pig.loader";
+ public static final String HCAT_PIG_LOADER_LOCATION_SET = HCAT_PIG_LOADER + ".location.set" ;
public static final String HCAT_PIG_LOADER_ARGS = "hcat.pig.loader.args";
public static final String HCAT_PIG_STORER = "hcat.pig.storer";
public static final String HCAT_PIG_STORER_ARGS = "hcat.pig.storer.args";
public static final String HCAT_PIG_ARGS_DELIMIT = "hcat.pig.args.delimiter";
public static final String HCAT_PIG_ARGS_DELIMIT_DEFAULT = ",";
-
+ public static final String HCAT_PIG_STORER_LOCATION_SET = HCAT_PIG_STORER + ".location.set" ;
+
//The keys used to store info into the job Configuration
public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat";
@@ -59,7 +61,7 @@ public final class HCatConstants {
public static final String HCAT_CREATE_DB_NAME = "hcat.create.db.name";
- public static final String HCAT_METASTORE_PRINCIPAL
+ public static final String HCAT_METASTORE_PRINCIPAL
= HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname;
// IMPORTANT IMPORTANT IMPORTANT!!!!!
@@ -82,11 +84,11 @@ public final class HCatConstants {
public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq";
public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration";
-
+
public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy";
public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix";
-
+
public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid";
public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
@@ -103,7 +105,7 @@ public final class HCatConstants {
// System environment variables
public static final String SYSENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION";
-
+
// Hadoop Conf Var Names
public static final String CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1327170&r1=1327169&r2=1327170&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java Tue Apr 17 16:31:03 2012
@@ -18,7 +18,11 @@
package org.apache.hcatalog.pig;
import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
@@ -26,6 +30,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.Credentials;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.Pair;
@@ -53,10 +58,13 @@ public class HCatLoader extends HCatBase
private String hcatServerUri;
private String partitionFilterString;
private final PigHCatUtil phutil = new PigHCatUtil();
-
+
// Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
+ // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+ //unique to the load func and input file name (table, in our case).
+ private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
@Override
public InputFormat<?,?> getInputFormat() throws IOException {
@@ -74,11 +82,16 @@ public class HCatLoader extends HCatBase
@Override
public void setLocation(String location, Job job) throws IOException {
+ UDFContext udfContext = UDFContext.getUDFContext();
+ Properties udfProps = udfContext.getUDFProperties(this.getClass(),
+ new String[]{signature});
job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
dbName = dbTablePair.first;
tableName = dbTablePair.second;
+ RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps
+ .get(PRUNE_PROJECTION_INFO);
// get partitionFilterString stored in the UDFContext - it would have
// been stored there by an earlier call to setPartitionFilter
// call setInput on HCatInputFormat only in the frontend because internally
@@ -86,50 +99,71 @@ public class HCatLoader extends HCatBase
// the backend
// in the hadoop front end mapred.task.id property will not be set in
// the Configuration
- if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){
- HCatInputFormat.setInput(job,
- InputJobInfo.create(dbName,
- tableName,
- getPartitionFilterString()));
- }
- // Need to also push projections by calling setOutputSchema on
- // HCatInputFormat - we have to get the RequiredFields information
- // from the UdfContext, translate it to an Schema and then pass it
- // The reason we do this here is because setLocation() is called by
- // Pig runtime at InputFormat.getSplits() and
- // InputFormat.createRecordReader() time - we are not sure when
- // HCatInputFormat needs to know about pruned projections - so doing it
- // here will ensure we communicate to HCatInputFormat about pruned
- // projections at getSplits() and createRecordReader() time
+ if (udfProps.containsKey(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET)) {
+ for( Enumeration<Object> emr = udfProps.keys();emr.hasMoreElements();) {
+ PigHCatUtil.getConfigFromUDFProperties(udfProps,
+ job.getConfiguration(), emr.nextElement().toString());
+ }
+ Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + signature);
+ if (crd != null) {
+ job.getCredentials().addAll(crd);
+ }
+
+ } else {
+ Job clone = new Job(job.getConfiguration());
+ HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
+ tableName, getPartitionFilterString()));
+
+ // We will store all the new /changed properties in the job in the
+ // udf context, so the the HCatInputFormat.setInput method need not
+ //be called many times.
+ for (Entry<String,String> keyValue : job.getConfiguration()) {
+ String oldValue = clone.getConfiguration().get(keyValue.getKey());
+ if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+ udfProps.put(keyValue.getKey(), keyValue.getValue());
+ }
+ }
+ udfProps.put(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET, true);
+
+ //Store credentials in a private hash map and not the udf context to
+ // make sure they are not public.
+ jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + signature,job.getCredentials());
+ }
- UDFContext udfContext = UDFContext.getUDFContext();
- Properties props = udfContext.getUDFProperties(this.getClass(),
- new String[]{signature});
- RequiredFieldList requiredFieldsInfo =
- (RequiredFieldList)props.get(PRUNE_PROJECTION_INFO);
- if(requiredFieldsInfo != null) {
- // convert to hcatschema and pass to HCatInputFormat
- try {
- outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass());
- HCatInputFormat.setOutputSchema(job, outputSchema);
- } catch (Exception e) {
- throw new IOException(e);
- }
- } else{
- // else - this means pig's optimizer never invoked the pushProjection
- // method - so we need all fields and hence we should not call the
- // setOutputSchema on HCatInputFormat
- if (HCatUtil.checkJobContextIfRunningFromBackend(job)){
- try {
- HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA);
- outputSchema = hcatTableSchema;
- HCatInputFormat.setOutputSchema(job, outputSchema);
- } catch (Exception e) {
- throw new IOException(e);
+ // Need to also push projections by calling setOutputSchema on
+ // HCatInputFormat - we have to get the RequiredFields information
+ // from the UdfContext, translate it to an Schema and then pass it
+ // The reason we do this here is because setLocation() is called by
+ // Pig runtime at InputFormat.getSplits() and
+ // InputFormat.createRecordReader() time - we are not sure when
+ // HCatInputFormat needs to know about pruned projections - so doing it
+ // here will ensure we communicate to HCatInputFormat about pruned
+ // projections at getSplits() and createRecordReader() time
+
+ if(requiredFieldsInfo != null) {
+ // convert to hcatschema and pass to HCatInputFormat
+ try {
+ outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass());
+ HCatInputFormat.setOutputSchema(job, outputSchema);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else{
+ // else - this means pig's optimizer never invoked the pushProjection
+ // method - so we need all fields and hence we should not call the
+ // setOutputSchema on HCatInputFormat
+ if (HCatUtil.checkJobContextIfRunningFromBackend(job)){
+ try {
+ HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA);
+ outputSchema = hcatTableSchema;
+ HCatInputFormat.setOutputSchema(job, outputSchema);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
}
- }
- }
+
}
@Override
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1327170&r1=1327169&r2=1327170&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java Tue Apr 17 16:31:03 2012
@@ -19,15 +19,19 @@
package org.apache.hcatalog.pig;
import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
@@ -49,6 +53,9 @@ public class HCatStorer extends HCatBase
// Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
+ // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+ //unique to the store func and out file name (table, in our case).
+ private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
public HCatStorer(String partSpecs, String schema) throws Exception {
@@ -70,74 +77,77 @@ public class HCatStorer extends HCatBase
@Override
public void setStoreLocation(String location, Job job) throws IOException {
- job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
- Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
+ Configuration config = job.getConfiguration();
+ config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
+ Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[] { sign });
String[] userStr = location.split("\\.");
- OutputJobInfo outputJobInfo;
- String outInfoString = p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO);
- if (outInfoString != null) {
- outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outInfoString);
+ if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) {
+ for(Enumeration<Object> emr = udfProps.keys();emr.hasMoreElements();){
+ PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString());
+ }
+ Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign);
+ if (crd != null) {
+ job.getCredentials().addAll(crd);
+ }
} else {
- if(userStr.length == 2) {
- outputJobInfo = OutputJobInfo.create(userStr[0],
- userStr[1],
- partitions);
- } else if(userStr.length == 1) {
- outputJobInfo = OutputJobInfo.create(null,
- userStr[0],
- partitions);
+ Job clone = new Job(job.getConfiguration());
+ OutputJobInfo outputJobInfo;
+ if (userStr.length == 2) {
+ outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
+ } else if (userStr.length == 1) {
+ outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
} else {
- throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
+ throw new FrontendException("location " + location
+ + " is invalid. It must be of the form [db.]table",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
}
- }
-
-
- Configuration config = job.getConfiguration();
- if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){
-
- Schema schema = (Schema)ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA));
- if(schema != null){
+ Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
+ if (schema != null) {
pigSchema = schema;
}
- if(pigSchema == null){
- throw new FrontendException("Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE);
+ if (pigSchema == null) {
+ throw new FrontendException(
+ "Schema for data cannot be determined.",
+ PigHCatUtil.PIG_EXCEPTION_CODE);
}
- try{
+ try {
HCatOutputFormat.setOutput(job, outputJobInfo);
- } catch(HCatException he) {
- // pass the message to the user - essentially something about the table
- // information passed to HCatOutputFormat was not right
- throw new PigException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+ } catch (HCatException he) {
+ // pass the message to the user - essentially something about
+ // the table
+ // information passed to HCatOutputFormat was not right
+ throw new PigException(he.getMessage(),
+ PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job);
- try{
+ try {
doSchemaValidations(pigSchema, hcatTblSchema);
- } catch(HCatException he){
+ } catch (HCatException he) {
throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
}
- computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema);
+ computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
HCatOutputFormat.setSchema(job, computedSchema);
- p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-
- PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF);
- PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
- PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
- PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
- p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
-
- }else{
- config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-
- PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF);
- PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
- PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
+ // We will store all the new /changed properties in the job in the
+ // udf context, so the the HCatOutputFormat.setOutput and setSchema
+ // methods need not be called many times.
+ for ( Entry<String,String> keyValue : job.getConfiguration()) {
+ String oldValue = clone.getConfiguration().get(keyValue.getKey());
+ if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+ udfProps.put(keyValue.getKey(), keyValue.getValue());
+ }
+ }
+ //Store credentials in a private hash map and not the udf context to
+ // make sure they are not public.
+ jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign,job.getCredentials());
+ udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true);
}
}
-
@Override
public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {