You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/04/17 16:31:04 UTC

svn commit: r1327170 - in /incubator/hcatalog/branches/branch-0.4: ./ CHANGES.txt src/java/org/apache/hcatalog/common/HCatConstants.java src/java/org/apache/hcatalog/pig/HCatLoader.java src/java/org/apache/hcatalog/pig/HCatStorer.java

Author: toffer
Date: Tue Apr 17 16:31:03 2012
New Revision: 1327170

URL: http://svn.apache.org/viewvc?rev=1327170&view=rev
Log:
merged from trunk: HCATALOG-314 HCatOutputFormat.setOutput is called more than once by HCatStorer (avandana via toffer)

Modified:
    incubator/hcatalog/branches/branch-0.4/   (props changed)
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java

Propchange: incubator/hcatalog/branches/branch-0.4/
------------------------------------------------------------------------------
  Merged /incubator/hcatalog/trunk:r1327167

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1327170&r1=1327169&r2=1327170&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Tue Apr 17 16:31:03 2012
@@ -108,6 +108,8 @@ Release 0.4.0 - Unreleased
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-314 HCatOutputFormat.setOutput is called more than once by HCatStorer (avandana via toffer)
+
   HCAT-378 Found a few source files missing Apache headers (gates)
 
   HCAT-377 Recent changes broke releaseaudit target (gates)

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1327170&r1=1327169&r2=1327170&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/common/HCatConstants.java Tue Apr 17 16:31:03 2012
@@ -28,15 +28,17 @@ public final class HCatConstants {
 
   public static final String SEQUENCEFILE_INPUT = SequenceFileInputFormat.class.getName();
   public static final String SEQUENCEFILE_OUTPUT = SequenceFileOutputFormat.class.getName();
-  
+
   public static final String HCAT_PIG_STORAGE_CLASS = "org.apache.pig.builtin.PigStorage";
   public static final String HCAT_PIG_LOADER = "hcat.pig.loader";
+  public static final String HCAT_PIG_LOADER_LOCATION_SET = HCAT_PIG_LOADER + ".location.set" ;
   public static final String HCAT_PIG_LOADER_ARGS = "hcat.pig.loader.args";
   public static final String HCAT_PIG_STORER = "hcat.pig.storer";
   public static final String HCAT_PIG_STORER_ARGS = "hcat.pig.storer.args";
   public static final String HCAT_PIG_ARGS_DELIMIT = "hcat.pig.args.delimiter";
   public static final String HCAT_PIG_ARGS_DELIMIT_DEFAULT = ",";
-  
+  public static final String HCAT_PIG_STORER_LOCATION_SET = HCAT_PIG_STORER + ".location.set" ;
+
   //The keys used to store info into the job Configuration
   public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat";
 
@@ -59,7 +61,7 @@ public final class HCatConstants {
 
   public static final String HCAT_CREATE_DB_NAME = "hcat.create.db.name";
 
-  public static final String HCAT_METASTORE_PRINCIPAL 
+  public static final String HCAT_METASTORE_PRINCIPAL
           = HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname;
 
   // IMPORTANT IMPORTANT IMPORTANT!!!!!
@@ -82,11 +84,11 @@ public final class HCatConstants {
 
   public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq";
   public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration";
-  
+
   public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
   public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy";
   public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix";
-  
+
   public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid";
   public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
 
@@ -103,7 +105,7 @@ public final class HCatConstants {
 
   // System environment variables
   public static final String SYSENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION";
-  
+
   // Hadoop Conf Var Names
   public static final String CONF_MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
 

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1327170&r1=1327169&r2=1327170&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatLoader.java Tue Apr 17 16:31:03 2012
@@ -18,7 +18,11 @@
 package org.apache.hcatalog.pig;
 
 import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.fs.Path;
@@ -26,6 +30,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.Pair;
@@ -53,10 +58,13 @@ public class HCatLoader extends HCatBase
   private String hcatServerUri;
   private String partitionFilterString;
   private final PigHCatUtil phutil = new PigHCatUtil();
-  
+
   // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize
   final public static String INNER_SIGNATURE = "hcatloader.inner.signature";
   final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature";
+  // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+  //unique to the load func and input file name (table, in our case).
+  private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
 
   @Override
   public InputFormat<?,?> getInputFormat() throws IOException {
@@ -74,11 +82,16 @@ public class HCatLoader extends HCatBase
 @Override
   public void setLocation(String location, Job job) throws IOException {
 
+    UDFContext udfContext = UDFContext.getUDFContext();
+    Properties udfProps = udfContext.getUDFProperties(this.getClass(),
+        new String[]{signature});
     job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature);
     Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
     dbName = dbTablePair.first;
     tableName = dbTablePair.second;
 
+    RequiredFieldList requiredFieldsInfo = (RequiredFieldList) udfProps
+    .get(PRUNE_PROJECTION_INFO);
     // get partitionFilterString stored in the UDFContext - it would have
     // been stored there by an earlier call to setPartitionFilter
     // call setInput on HCatInputFormat only in the frontend because internally
@@ -86,50 +99,71 @@ public class HCatLoader extends HCatBase
     // the backend
     // in the hadoop front end mapred.task.id property will not be set in
     // the Configuration
-    if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){
-      HCatInputFormat.setInput(job,
-                               InputJobInfo.create(dbName,
-                                                   tableName,
-                                                   getPartitionFilterString()));
-    }
 
-    // Need to also push projections by calling setOutputSchema on
-    // HCatInputFormat - we have to get the RequiredFields information
-    // from the UdfContext, translate it to an Schema and then pass it
-    // The reason we do this here is because setLocation() is called by
-    // Pig runtime at InputFormat.getSplits() and
-    // InputFormat.createRecordReader() time - we are not sure when
-    // HCatInputFormat needs to know about pruned projections - so doing it
-    // here will ensure we communicate to HCatInputFormat about pruned
-    // projections at getSplits() and createRecordReader() time
+        if (udfProps.containsKey(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET)) {
+            for( Enumeration<Object> emr = udfProps.keys();emr.hasMoreElements();) {
+                PigHCatUtil.getConfigFromUDFProperties(udfProps,
+                            job.getConfiguration(), emr.nextElement().toString());
+            }
+            Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + signature);
+            if (crd != null) {
+                job.getCredentials().addAll(crd);
+            }
+
+        } else {
+            Job clone = new Job(job.getConfiguration());
+            HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
+                    tableName, getPartitionFilterString()));
+
+            // We will store all the new /changed properties in the job in the
+            // udf context, so the the HCatInputFormat.setInput method need not
+            //be called many times.
+            for (Entry<String,String> keyValue : job.getConfiguration()) {
+                String oldValue = clone.getConfiguration().get(keyValue.getKey());
+                if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+                    udfProps.put(keyValue.getKey(), keyValue.getValue());
+                }
+            }
+            udfProps.put(HCatConstants.HCAT_PIG_LOADER_LOCATION_SET, true);
+
+            //Store credentials in a private hash map and not the udf context to
+            // make sure they are not public.
+            jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + signature,job.getCredentials());
+        }
 
-    UDFContext udfContext = UDFContext.getUDFContext();
-    Properties props = udfContext.getUDFProperties(this.getClass(),
-        new String[]{signature});
-    RequiredFieldList requiredFieldsInfo =
-      (RequiredFieldList)props.get(PRUNE_PROJECTION_INFO);
-    if(requiredFieldsInfo != null) {
-      // convert to hcatschema and pass to HCatInputFormat
-      try {
-        outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass());
-        HCatInputFormat.setOutputSchema(job, outputSchema);
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    } else{
-      // else - this means pig's optimizer never invoked the pushProjection
-      // method - so we need all fields and hence we should not call the
-      // setOutputSchema on HCatInputFormat
-      if (HCatUtil.checkJobContextIfRunningFromBackend(job)){
-        try {
-          HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA);
-          outputSchema = hcatTableSchema;
-          HCatInputFormat.setOutputSchema(job, outputSchema);
-        } catch (Exception e) {
-          throw new IOException(e);
+        // Need to also push projections by calling setOutputSchema on
+        // HCatInputFormat - we have to get the RequiredFields information
+        // from the UdfContext, translate it to an Schema and then pass it
+        // The reason we do this here is because setLocation() is called by
+        // Pig runtime at InputFormat.getSplits() and
+        // InputFormat.createRecordReader() time - we are not sure when
+        // HCatInputFormat needs to know about pruned projections - so doing it
+        // here will ensure we communicate to HCatInputFormat about pruned
+        // projections at getSplits() and createRecordReader() time
+
+        if(requiredFieldsInfo != null) {
+          // convert to hcatschema and pass to HCatInputFormat
+          try {
+            outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass());
+            HCatInputFormat.setOutputSchema(job, outputSchema);
+          } catch (Exception e) {
+            throw new IOException(e);
+          }
+        } else{
+          // else - this means pig's optimizer never invoked the pushProjection
+          // method - so we need all fields and hence we should not call the
+          // setOutputSchema on HCatInputFormat
+          if (HCatUtil.checkJobContextIfRunningFromBackend(job)){
+            try {
+              HCatSchema hcatTableSchema = (HCatSchema) udfProps.get(HCatConstants.HCAT_TABLE_SCHEMA);
+              outputSchema = hcatTableSchema;
+              HCatInputFormat.setOutputSchema(job, outputSchema);
+            } catch (Exception e) {
+              throw new IOException(e);
+            }
+          }
         }
-      }
-    }
+
   }
 
   @Override

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1327170&r1=1327169&r2=1327170&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/pig/HCatStorer.java Tue Apr 17 16:31:03 2012
@@ -19,15 +19,19 @@
 package org.apache.hcatalog.pig;
 
 import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
@@ -49,6 +53,9 @@ public class HCatStorer extends HCatBase
   // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
   final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
   final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature";
+  // A hash map which stores job credentials. The key is a signature passed by Pig, which is
+  //unique to the store func and out file name (table, in our case).
+  private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
 
 
   public HCatStorer(String partSpecs, String schema) throws Exception {
@@ -70,74 +77,77 @@ public class HCatStorer extends HCatBase
 
   @Override
   public void setStoreLocation(String location, Job job) throws IOException {
-    job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
-    Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
 
+    Configuration config = job.getConfiguration();
+    config.set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign);
+    Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+            this.getClass(), new String[] { sign });
     String[] userStr = location.split("\\.");
-    OutputJobInfo outputJobInfo;
 
-    String outInfoString = p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-    if (outInfoString != null) {
-      outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outInfoString);
+    if (udfProps.containsKey(HCatConstants.HCAT_PIG_STORER_LOCATION_SET)) {
+      for(Enumeration<Object> emr = udfProps.keys();emr.hasMoreElements();){
+        PigHCatUtil.getConfigFromUDFProperties(udfProps, config, emr.nextElement().toString());
+      }
+      Credentials crd = jobCredentials.get(INNER_SIGNATURE_PREFIX + "_" + sign);
+      if (crd != null) {
+        job.getCredentials().addAll(crd);
+      }
     } else {
-      if(userStr.length == 2) {
-        outputJobInfo = OutputJobInfo.create(userStr[0],
-                                                             userStr[1],
-                                                             partitions);
-      } else if(userStr.length == 1) {
-        outputJobInfo = OutputJobInfo.create(null,
-                                                             userStr[0],
-                                                             partitions);
+      Job clone = new Job(job.getConfiguration());
+      OutputJobInfo outputJobInfo;
+      if (userStr.length == 2) {
+        outputJobInfo = OutputJobInfo.create(userStr[0], userStr[1], partitions);
+      } else if (userStr.length == 1) {
+        outputJobInfo = OutputJobInfo.create(null, userStr[0], partitions);
       } else {
-        throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
+        throw new FrontendException("location " + location
+              + " is invalid. It must be of the form [db.]table",
+              PigHCatUtil.PIG_EXCEPTION_CODE);
       }
-    }
-
-
-    Configuration config = job.getConfiguration();
-    if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){
-
-      Schema schema = (Schema)ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA));
-      if(schema != null){
+      Schema schema = (Schema) ObjectSerializer.deserialize(udfProps.getProperty(PIG_SCHEMA));
+      if (schema != null) {
         pigSchema = schema;
       }
-      if(pigSchema == null){
-        throw new FrontendException("Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE);
+      if (pigSchema == null) {
+        throw new FrontendException(
+            "Schema for data cannot be determined.",
+            PigHCatUtil.PIG_EXCEPTION_CODE);
       }
-      try{
+      try {
         HCatOutputFormat.setOutput(job, outputJobInfo);
-      } catch(HCatException he) {
-          // pass the message to the user - essentially something about the table
-          // information passed to HCatOutputFormat was not right
-          throw new PigException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+      } catch (HCatException he) {
+        // pass the message to the user - essentially something about
+        // the table
+        // information passed to HCatOutputFormat was not right
+        throw new PigException(he.getMessage(),
+            PigHCatUtil.PIG_EXCEPTION_CODE, he);
       }
       HCatSchema hcatTblSchema = HCatOutputFormat.getTableSchema(job);
-      try{
+      try {
         doSchemaValidations(pigSchema, hcatTblSchema);
-      } catch(HCatException he){
+      } catch (HCatException he) {
         throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
       }
-      computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema);
+      computedSchema = convertPigSchemaToHCatSchema(pigSchema, hcatTblSchema);
       HCatOutputFormat.setSchema(job, computedSchema);
-      p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-
-      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF);
-      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
-      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
-      PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_OUTPUT_INFO);
+      udfProps.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
 
-      p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
-
-    }else{
-      config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-
-      PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF);
-      PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
-      PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
+      // We will store all the new /changed properties in the job in the
+      // udf context, so the the HCatOutputFormat.setOutput and setSchema
+      // methods need not be called many times.
+      for ( Entry<String,String> keyValue : job.getConfiguration()) {
+        String oldValue = clone.getConfiguration().get(keyValue.getKey());
+        if ((oldValue == null) || (keyValue.getValue().equals(oldValue) == false)) {
+          udfProps.put(keyValue.getKey(), keyValue.getValue());
+        }
+      }
+      //Store credentials in a private hash map and not the udf context to
+      // make sure they are not public.
+      jobCredentials.put(INNER_SIGNATURE_PREFIX + "_" + sign,job.getCredentials());
+      udfProps.put(HCatConstants.HCAT_PIG_STORER_LOCATION_SET, true);
     }
   }
 
-
   @Override
   public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
     if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {