You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/16 19:28:25 UTC
svn commit: r1587990 - in /pig/trunk: CHANGES.txt
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
Author: rohini
Date: Wed Apr 16 17:28:25 2014
New Revision: 1587990
URL: http://svn.apache.org/r1587990
Log:
PIG-3771: Piggybank Avrostorage makes a lot of namenode calls in the backend (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1587990&r1=1587989&r2=1587990&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Apr 16 17:28:25 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3771: Piggybank Avrostorage makes a lot of namenode calls in the backend (rohini)
+
PIG-3851: Upgrade jline to 2.11 (daijy)
PIG-3884: Move multi store counters to PigStatsUtil from MRPigStatsUtil (rohini)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1587990&r1=1587989&r2=1587990&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Wed Apr 16 17:28:25 2014
@@ -19,6 +19,7 @@ package org.apache.pig.piggybank.storage
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -27,9 +28,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -53,6 +57,7 @@ import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -64,9 +69,13 @@ import org.json.simple.parser.ParseExcep
*/
public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
+ private static final Log LOG = LogFactory.getLog(AvroStorage.class);
/* storeFunc parameters */
private static final String NOTNULL = "NOTNULL";
private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
+ private static final String AVRO_INPUT_SCHEMA_PROPERTY = "avro_input_schema";
+ private static final String AVRO_INPUT_PIG_SCHEMA_PROPERTY = "avro_input_pig_schema";
+ private static final String AVRO_MERGED_SCHEMA_PROPERTY = "avro_merged_schema_map";
private static final String SCHEMA_DELIM = "#";
private static final String SCHEMA_KEYVALUE_DELIM = "@";
private static final String NO_SCHEMA_CHECK = "no_schema_check";
@@ -145,12 +154,38 @@ public class AvroStorage extends FileInp
/**
* Set input location and obtain input schema.
*/
+ @SuppressWarnings("unchecked")
@Override
public void setLocation(String location, Job job) throws IOException {
if (inputAvroSchema != null) {
return;
}
+ if (!UDFContext.getUDFContext().isFrontend()) {
+ Properties udfProps = getUDFProperties();
+ String mergedSchema = udfProps.getProperty(AVRO_MERGED_SCHEMA_PROPERTY);
+ if (mergedSchema != null) {
+ HashMap<URI, Map<Integer, Integer>> mergedSchemaMap =
+ (HashMap<URI, Map<Integer, Integer>>) ObjectSerializer.deserialize(mergedSchema);
+ schemaToMergedSchemaMap = new HashMap<Path, Map<Integer, Integer>>();
+ for (Entry<URI, Map<Integer, Integer>> entry : mergedSchemaMap.entrySet()) {
+ schemaToMergedSchemaMap.put(new Path(entry.getKey()), entry.getValue());
+ }
+ }
+ String schema = udfProps.getProperty(AVRO_INPUT_SCHEMA_PROPERTY);
+ if (schema != null) {
+ try {
+ inputAvroSchema = new Schema.Parser().parse(schema);
+ return;
+ } catch (Exception e) {
+ // Cases like testMultipleSchemas2 cause exception while deserializing
+ // symbols. In that case, we get it again.
+ LOG.warn("Exception while trying to deserialize schema in backend. " +
+ "Will construct again. schema= " + schema, e);
+ }
+ }
+ }
+
Configuration conf = job.getConfiguration();
Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
if (!paths.isEmpty()) {
@@ -158,10 +193,19 @@ public class AvroStorage extends FileInp
// bloat configuration size
FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
// Scan all directories including sub directories for schema
- setInputAvroSchema(paths, conf);
+ if (inputAvroSchema == null) {
+ setInputAvroSchema(paths, conf);
+ }
} else {
throw new IOException("Input path \'" + location + "\' is not found");
}
+
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ super.setUDFContextSignature(signature);
}
/**
@@ -284,7 +328,8 @@ public class AvroStorage extends FileInp
}
}
// schemaToMergedSchemaMap is only needed when merging multiple records.
- if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
+ if ((schemaToMergedSchemaMap == null || schemaToMergedSchemaMap.isEmpty()) &&
+ mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
}
return result;
@@ -398,6 +443,19 @@ public class AvroStorage extends FileInp
if (pigSchema.getFields().length == 1){
pigSchema = pigSchema.getFields()[0].getSchema();
}
+ Properties udfProps = getUDFProperties();
+ udfProps.put(AVRO_INPUT_SCHEMA_PROPERTY, inputAvroSchema.toString());
+ udfProps.put(AVRO_INPUT_PIG_SCHEMA_PROPERTY, pigSchema);
+ if (schemaToMergedSchemaMap != null) {
+ HashMap<URI, Map<Integer, Integer>> mergedSchemaMap = new HashMap<URI, Map<Integer, Integer>>();
+ for (Entry<Path, Map<Integer, Integer>> entry : schemaToMergedSchemaMap.entrySet()) {
+ //Path is not serializable
+ mergedSchemaMap.put(entry.getKey().toUri(), entry.getValue());
+ }
+ udfProps.put(AVRO_MERGED_SCHEMA_PROPERTY,
+ ObjectSerializer.serialize(mergedSchemaMap));
+ }
+
return pigSchema;
} else {
return null;
@@ -731,6 +789,7 @@ public class AvroStorage extends FileInp
@Override
public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
+ super.setUDFContextSignature(signature);
}
@Override