You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/16 19:28:25 UTC

svn commit: r1587990 - in /pig/trunk: CHANGES.txt contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java

Author: rohini
Date: Wed Apr 16 17:28:25 2014
New Revision: 1587990

URL: http://svn.apache.org/r1587990
Log:
PIG-3771: Piggybank Avrostorage makes a lot of namenode calls in the backend (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1587990&r1=1587989&r2=1587990&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Apr 16 17:28:25 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3771: Piggybank Avrostorage makes a lot of namenode calls in the backend (rohini)
+
 PIG-3851: Upgrade jline to 2.11 (daijy)
 
 PIG-3884: Move multi store counters to PigStatsUtil from MRPigStatsUtil (rohini)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1587990&r1=1587989&r2=1587990&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Wed Apr 16 17:28:25 2014
@@ -19,6 +19,7 @@ package org.apache.pig.piggybank.storage
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -27,9 +28,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -53,6 +57,7 @@ import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -64,9 +69,13 @@ import org.json.simple.parser.ParseExcep
  */
 public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
 
+    private static final Log LOG = LogFactory.getLog(AvroStorage.class);
     /* storeFunc parameters */
     private static final String NOTNULL = "NOTNULL";
     private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
+    private static final String AVRO_INPUT_SCHEMA_PROPERTY = "avro_input_schema";
+    private static final String AVRO_INPUT_PIG_SCHEMA_PROPERTY = "avro_input_pig_schema";
+    private static final String AVRO_MERGED_SCHEMA_PROPERTY = "avro_merged_schema_map";
     private static final String SCHEMA_DELIM = "#";
     private static final String SCHEMA_KEYVALUE_DELIM = "@";
     private static final String NO_SCHEMA_CHECK = "no_schema_check";
@@ -145,12 +154,38 @@ public class AvroStorage extends FileInp
     /**
      * Set input location and obtain input schema.
      */
+    @SuppressWarnings("unchecked")
     @Override
     public void setLocation(String location, Job job) throws IOException {
         if (inputAvroSchema != null) {
             return;
         }
 
+        if (!UDFContext.getUDFContext().isFrontend()) {
+            Properties udfProps = getUDFProperties();
+            String mergedSchema = udfProps.getProperty(AVRO_MERGED_SCHEMA_PROPERTY);
+            if (mergedSchema != null) {
+                HashMap<URI, Map<Integer, Integer>> mergedSchemaMap =
+                        (HashMap<URI, Map<Integer, Integer>>) ObjectSerializer.deserialize(mergedSchema);
+                schemaToMergedSchemaMap = new HashMap<Path, Map<Integer, Integer>>();
+                for (Entry<URI, Map<Integer, Integer>> entry : mergedSchemaMap.entrySet()) {
+                    schemaToMergedSchemaMap.put(new Path(entry.getKey()), entry.getValue());
+                }
+            }
+            String schema = udfProps.getProperty(AVRO_INPUT_SCHEMA_PROPERTY);
+            if (schema != null) {
+                try {
+                    inputAvroSchema = new Schema.Parser().parse(schema);
+                    return;
+                } catch (Exception e) {
+                    // Cases like testMultipleSchemas2 cause exception while deserializing
+                    // symbols. In that case, we get it again.
+                    LOG.warn("Exception while trying to deserialize schema in backend. " +
+                            "Will construct again. schema= " + schema, e);
+                }
+            }
+        }
+
         Configuration conf = job.getConfiguration();
         Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
         if (!paths.isEmpty()) {
@@ -158,10 +193,19 @@ public class AvroStorage extends FileInp
             // bloat configuration size
             FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
             // Scan all directories including sub directories for schema
-            setInputAvroSchema(paths, conf);
+            if (inputAvroSchema == null) {
+                setInputAvroSchema(paths, conf);
+            }
         } else {
             throw new IOException("Input path \'" + location + "\' is not found");
         }
+
+    }
+
+    @Override
+    public void setUDFContextSignature(String signature) {
+        this.contextSignature = signature;
+        super.setUDFContextSignature(signature);
     }
 
     /**
@@ -284,7 +328,8 @@ public class AvroStorage extends FileInp
             }
         }
         // schemaToMergedSchemaMap is only needed when merging multiple records.
-        if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
+        if ((schemaToMergedSchemaMap == null || schemaToMergedSchemaMap.isEmpty()) &&
+                mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
             schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
         }
         return result;
@@ -398,6 +443,19 @@ public class AvroStorage extends FileInp
             if (pigSchema.getFields().length == 1){
                 pigSchema = pigSchema.getFields()[0].getSchema();
             }
+            Properties udfProps = getUDFProperties();
+            udfProps.put(AVRO_INPUT_SCHEMA_PROPERTY, inputAvroSchema.toString());
+            udfProps.put(AVRO_INPUT_PIG_SCHEMA_PROPERTY, pigSchema);
+            if (schemaToMergedSchemaMap != null) {
+                HashMap<URI, Map<Integer, Integer>> mergedSchemaMap = new HashMap<URI, Map<Integer, Integer>>();
+                for (Entry<Path, Map<Integer, Integer>> entry : schemaToMergedSchemaMap.entrySet()) {
+                    //Path is not serializable
+                    mergedSchemaMap.put(entry.getKey().toUri(), entry.getValue());
+                }
+                udfProps.put(AVRO_MERGED_SCHEMA_PROPERTY,
+                        ObjectSerializer.serialize(mergedSchemaMap));
+            }
+
             return pigSchema;
         } else {
             return null;
@@ -731,6 +789,7 @@ public class AvroStorage extends FileInp
     @Override
     public void setStoreFuncUDFContextSignature(String signature) {
         this.contextSignature = signature;
+        super.setUDFContextSignature(signature);
     }
 
     @Override