You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/07/29 03:07:31 UTC

git commit: Column index access support

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 4a07b3f54 -> 17864dfc0


Column index access support

This patch adds the ability to use column index based access to parquet files in pig, which allows for rename capability similar to other file formats.  This is achieved by using the parametrized loader with an alternate schema.

Example:
# File Schema: {c1:int, c2:float, c3:chararray}
p = LOAD '/data/parquet/' USING parquet.pig.ParquetLoader('n1:int, n2:float, n3:chararray', 'true');

In this example, the names from the requested schema will be translated to the column positions from the file and will produce tuples based on the index position.

Two test cases are included that exercise index based access for both full file reads and column projected reads.

Note:  This patch also disables the enforcer plugin on the pig project per discussion at the parquet meetup.  The justification for this is that the enforcer is too strict for internal classes and results in dead code because duplicating methods is required to add parameters where there is only one usage of the constructor/method.  The interface for the pig loader is imposed by LoadFunc and StoreFunc by the pig project and the implementations internals should not be used directly.

Author: Daniel Weeks <dw...@netflix.com>

Closes #12 from dcw-netflix/column-index-access and squashes the following commits:

1b5c5cf [Daniel Weeks] Refactored based on rewview comments
12b53c1 [Daniel Weeks] Fixed some formatting and the missing filter method sig
e5553f1 [Daniel Weeks] Adding back default constructor to satisfy other project requirements
69d21e0 [Daniel Weeks] Merge branch 'master' into column-index-access
f725c6f [Daniel Weeks] Removed enforcer for pig support
d182dc6 [Daniel Weeks] Introduces column index access
1c3c0c7 [Daniel Weeks] Fixed test with strict checking off
f3cb495 [Daniel Weeks] Added type persuasion for primitive types with a flag to control strict type checking for conflicting schemas, which is strict by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/17864dfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/17864dfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/17864dfc

Branch: refs/heads/master
Commit: 17864dfc0711d52d5af330469a1c2bd76128d46e
Parents: 4a07b3f
Author: Daniel Weeks <dw...@netflix.com>
Authored: Mon Jul 28 18:07:07 2014 -0700
Committer: julien <ju...@twitter.com>
Committed: Mon Jul 28 18:07:07 2014 -0700

----------------------------------------------------------------------
 parquet-pig/pom.xml                             |   3 -
 .../main/java/parquet/pig/ParquetLoader.java    |  55 +++++++-
 .../java/parquet/pig/PigSchemaConverter.java    | 128 ++++++++++++++++---
 .../main/java/parquet/pig/TupleReadSupport.java |  43 +++++--
 .../java/parquet/pig/TupleWriteSupport.java     |   2 +-
 .../java/parquet/pig/convert/MapConverter.java  |  10 +-
 .../parquet/pig/convert/TupleConverter.java     |  37 ++++--
 .../pig/convert/TupleRecordMaterializer.java    |   6 +-
 .../java/parquet/pig/TestParquetLoader.java     |  74 +++++++++++
 .../parquet/pig/TestPigSchemaConverter.java     |   2 +-
 10 files changed, 306 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml
index ed825ae..ad30441 100644
--- a/parquet-pig/pom.xml
+++ b/parquet-pig/pom.xml
@@ -96,9 +96,6 @@
   <build>
     <plugins>
       <plugin>
-        <artifactId>maven-enforcer-plugin</artifactId>
-      </plugin>
-      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
index e658fee..91d68bd 100644
--- a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
+++ b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
@@ -21,7 +21,10 @@ import static parquet.Log.DEBUG;
 import static parquet.hadoop.util.ContextUtil.getConfiguration;
 import static parquet.pig.PigSchemaConverter.parsePigSchema;
 import static parquet.pig.PigSchemaConverter.pigSchemaToString;
+import static parquet.pig.PigSchemaConverter.serializeRequiredFieldList;
 import static parquet.pig.TupleReadSupport.PARQUET_PIG_SCHEMA;
+import static parquet.pig.TupleReadSupport.PARQUET_PIG_REQUIRED_FIELDS;
+import static parquet.pig.TupleReadSupport.PARQUET_COLUMN_INDEX_ACCESS;
 import static parquet.pig.TupleReadSupport.getPigSchemaFromMultipleFiles;
 
 import java.io.IOException;
@@ -71,12 +74,14 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
   static final Map<String, Reference<ParquetInputFormat<Tuple>>> inputFormatCache = new WeakHashMap<String, Reference<ParquetInputFormat<Tuple>>>();
 
   private Schema requestedSchema;
+  private boolean columnIndexAccess;
 
   private String location;
   private boolean setLocationHasBeenCalled = false;
   private RecordReader<Void, Tuple> reader;
   private ParquetInputFormat<Tuple> parquetInputFormat;
   private Schema schema;
+  private RequiredFieldList requiredFieldList = null;
   protected String signature;
 
   /**
@@ -91,7 +96,40 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
    * @param requestedSchemaStr a subset of the original pig schema in the file
    */
   public ParquetLoader(String requestedSchemaStr) {
-    this.requestedSchema = parsePigSchema(requestedSchemaStr);
+    this(parsePigSchema(requestedSchemaStr), false);
+  }
+  
+  /**
+   * To read only a subset of the columns in the file optionally assigned by 
+   * column positions.  Using column positions allows for renaming the fields
+   * and is more inline with the "schema-on-read" approach to accessing file
+   * data.
+   * 
+   * Example: 
+   * File Schema:  'c1:int, c2:float, c3:double, c4:long'
+   * ParquetLoader('n1:int, n2:float, n3:double, n4:long', 'true');
+   * 
+   * This will use the names provided in the requested schema and assign them
+   * to column positions indicated by order.
+   * 
+   * @param requestedSchemaStr a subset of the original pig schema in the file
+   * @param columnIndexAccess use column index positions as opposed to name (default: false)
+   */
+  public ParquetLoader(String requestedSchemaStr, String columnIndexAccess) {
+    this(parsePigSchema(requestedSchemaStr), Boolean.parseBoolean(columnIndexAccess));
+  }
+  
+  /**
+   * Use the provided schema to access the underlying file data.
+   * 
+   * The same as the string based constructor but for programmatic use.
+   * 
+   * @param requestedSchema a subset of the original pig schema in the file
+   * @param columnIndexAccess  
+   */
+  public ParquetLoader(Schema requestedSchema, boolean columnIndexAccess) {
+    this.requestedSchema = requestedSchema;
+    this.columnIndexAccess = columnIndexAccess;
   }
 
   @Override
@@ -99,6 +137,14 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
     if (DEBUG) LOG.debug("LoadFunc.setLocation(" + location + ", " + job + ")");
     setInput(location, job);
     getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
+    
+    if(requiredFieldList != null) {
+      getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
+    }
+    
+    if(this.columnIndexAccess) {
+        getConfiguration(job).set(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
+    }
   }
 
   private void setInput(String location, Job job) throws IOException {
@@ -194,6 +240,8 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
       return;
     }
     schema = PigSchemaConverter.parsePigSchema(getPropertyFromUDFContext(PARQUET_PIG_SCHEMA));
+    requiredFieldList = PigSchemaConverter.deserializeRequiredFieldList(getPropertyFromUDFContext(PARQUET_PIG_REQUIRED_FIELDS));
+    columnIndexAccess = columnIndexAccess || Boolean.parseBoolean(getPropertyFromUDFContext(PARQUET_COLUMN_INDEX_ACCESS));
     if (schema == null && requestedSchema != null) {
       // this is only true in front-end
       schema = requestedSchema;
@@ -275,10 +323,15 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
   @Override
   public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
       throws FrontendException {
+    this.requiredFieldList = requiredFieldList;
+    
     if (requiredFieldList == null)
       return null;
+    
     schema = getSchemaFromRequiredFieldList(schema, requiredFieldList.getFields());
     storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
+    storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
+    
     return new RequiredFieldResponse(true);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/main/java/parquet/pig/PigSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/PigSchemaConverter.java b/parquet-pig/src/main/java/parquet/pig/PigSchemaConverter.java
index 8b3f02f..bfa7ac3 100644
--- a/parquet-pig/src/main/java/parquet/pig/PigSchemaConverter.java
+++ b/parquet-pig/src/main/java/parquet/pig/PigSchemaConverter.java
@@ -15,20 +15,24 @@
  */
 package parquet.pig;
 
-import static parquet.Log.DEBUG;
-
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.parser.ParserException;
-
 import parquet.Log;
+import static parquet.Log.DEBUG;
+import static parquet.pig.TupleReadSupport.PARQUET_PIG_REQUIRED_FIELDS;
+
 import parquet.schema.ConversionPatterns;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
@@ -53,6 +57,19 @@ import parquet.schema.Type.Repetition;
  */
 public class PigSchemaConverter {
   private static final Log LOG = Log.getLog(PigSchemaConverter.class);
+  private ColumnAccess columnAccess;
+
+  public PigSchemaConverter() {
+    this(false);
+  }
+  
+  /**
+   * 
+   * @param columnIndexAccess toggle between name and index based access (default: false)
+   */
+  public PigSchemaConverter(boolean columnIndexAccess) {
+    this.columnAccess = columnIndexAccess?new ColumnIndexAccess():new ColumnNameAccess();
+  }
 
   /**
    * @param pigSchemaString the pig schema to parse
@@ -66,6 +83,58 @@ public class PigSchemaConverter {
     }
   }
 
+  interface ColumnAccess {
+    List<Type> filterTupleSchema(GroupType schemaToFilter, Schema pigSchema, RequiredFieldList requiredFieldsList);
+  }
+  
+  class ColumnIndexAccess implements ColumnAccess {
+    @Override
+    public List<Type> filterTupleSchema(GroupType schemaToFilter, Schema pigSchema, RequiredFieldList requiredFieldsList) {
+      List<Type> newFields = new ArrayList<Type>();
+      List<Pair<FieldSchema,Integer>> indexedFields = new ArrayList<Pair<FieldSchema,Integer>>();
+      
+      try {
+        if(requiredFieldsList == null) {
+          int index = 0;
+          for(FieldSchema fs : pigSchema.getFields()) {
+            indexedFields.add(new Pair<FieldSchema, Integer>(fs, index++));
+          }
+        } else {
+          for(RequiredField rf : requiredFieldsList.getFields()) {
+            indexedFields.add(new Pair<FieldSchema, Integer>(pigSchema.getField(rf.getAlias()), rf.getIndex()));
+          }
+        }
+
+        for (Pair<FieldSchema, Integer> p : indexedFields) {
+          FieldSchema fieldSchema = pigSchema.getField(p.first.alias);
+          if (p.second < schemaToFilter.getFieldCount()) {
+            Type type = schemaToFilter.getFields().get(p.second);
+            newFields.add(filter(type, fieldSchema));
+          }
+        }
+      } catch (FrontendException e) {
+          throw new RuntimeException("Failed to filter requested fields", e);
+      }  
+      return newFields;
+    } 
+  }
+  
+  class ColumnNameAccess implements ColumnAccess {
+    @Override
+    public List<Type> filterTupleSchema(GroupType schemaToFilter, Schema requestedPigSchema, RequiredFieldList requiredFieldsList) {
+      List<FieldSchema> fields = requestedPigSchema.getFields();
+      List<Type> newFields = new ArrayList<Type>();
+      for (int i = 0; i < fields.size(); i++) {
+        FieldSchema fieldSchema = fields.get(i);
+        String name = name(fieldSchema.alias, "field_"+i);
+        if (schemaToFilter.containsField(name)) {
+          newFields.add(filter(schemaToFilter.getType(name), fieldSchema));
+        }
+      }
+      return newFields;
+    }
+  }
+  
   /**
    * @param pigSchema the pig schema to turn into a string representation
    * @return the sctring representation of the schema
@@ -75,6 +144,26 @@ public class PigSchemaConverter {
     return pigSchemaString.substring(1, pigSchemaString.length() - 1);
   }
 
+  public static RequiredFieldList deserializeRequiredFieldList(String requiredFieldString) {
+    if(requiredFieldString == null) {
+        return null;
+    }
+    
+    try {
+      return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to deserialize pushProjection", e);
+    }
+  }
+  
+  static String serializeRequiredFieldList(RequiredFieldList requiredFieldList) {
+    try {
+      return ObjectSerializer.serialize(requiredFieldList);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to searlize required fields.", e);
+    }
+  }
+  
   /**
    * converts a parquet schema into a pig schema
    * @param parquetSchema the parquet schema to convert to Pig schema
@@ -346,29 +435,26 @@ public class PigSchemaConverter {
    * @return the resulting filtered schema
    */
   public MessageType filter(MessageType schemaToFilter, Schema requestedPigSchema) {
+    return filter(schemaToFilter, requestedPigSchema, null);
+  }
+  
+  /**
+   * filters a Parquet schema based on a pig schema for projection
+   * @param schemaToFilter the schema to be filter
+   * @param requestedPigSchema the pig schema to filter it with
+   * @param requiredFieldList projected required fields
+   * @return the resulting filtered schema
+   */
+  public MessageType filter(MessageType schemaToFilter, Schema requestedPigSchema, RequiredFieldList requiredFieldList) {
     try {
       if (DEBUG) LOG.debug("filtering schema:\n" + schemaToFilter + "\nwith requested pig schema:\n " + requestedPigSchema);
-      List<Type> result = filterTupleSchema(schemaToFilter, requestedPigSchema);
+      List<Type> result = columnAccess.filterTupleSchema(schemaToFilter, requestedPigSchema, requiredFieldList);
       if (DEBUG) LOG.debug("schema:\n" + schemaToFilter + "\nfiltered to:\n" + result);
       return new MessageType(schemaToFilter.getName(), result);
     } catch (RuntimeException e) {
       throw new RuntimeException("can't filter " + schemaToFilter + " with " + requestedPigSchema, e);
     }
-  }
-
-  private List<Type> filterTupleSchema(GroupType schemaToFilter, Schema requestedPigSchema) {
-    List<FieldSchema> fields = requestedPigSchema.getFields();
-    List<Type> newFields = new ArrayList<Type>();
-    for (int i = 0; i < fields.size(); i++) {
-      FieldSchema fieldSchema = fields.get(i);
-      String name = name(fieldSchema.alias, "field_"+i);
-      if (schemaToFilter.containsField(name)) {
-        Type type = schemaToFilter.getType(name);
-        newFields.add(filter(type, fieldSchema));
-      }
-    }
-    return newFields;
-  }
+  }  
 
   private Type filter(Type type, FieldSchema fieldSchema) {
     if (DEBUG) LOG.debug("filtering type:\n" + type + "\nwith:\n " + fieldSchema);
@@ -392,7 +478,7 @@ public class PigSchemaConverter {
 
   private Type filterTuple(GroupType tupleType, FieldSchema tupleFieldSchema) throws FrontendException {
     if (DEBUG) LOG.debug("filtering TUPLE schema:\n" + tupleType + "\nwith:\n " + tupleFieldSchema);
-    return new GroupType(tupleType.getRepetition(), tupleType.getName(), tupleType.getOriginalType(), filterTupleSchema(tupleType, tupleFieldSchema.schema));
+    return new GroupType(tupleType.getRepetition(), tupleType.getName(), tupleType.getOriginalType(), columnAccess.filterTupleSchema(tupleType, tupleFieldSchema.schema, null));
   }
 
   private Type filterMap(GroupType mapType, FieldSchema mapFieldSchema) throws FrontendException {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
index 36b21ac..4076134 100644
--- a/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
+++ b/parquet-pig/src/main/java/parquet/pig/TupleReadSupport.java
@@ -17,17 +17,19 @@ package parquet.pig;
 
 import static parquet.pig.PigSchemaConverter.parsePigSchema;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-
+import org.apache.pig.impl.util.ObjectSerializer;
 import parquet.Log;
 import parquet.hadoop.api.InitContext;
 import parquet.hadoop.api.ReadSupport;
@@ -46,10 +48,12 @@ import parquet.schema.MessageType;
  */
 public class TupleReadSupport extends ReadSupport<Tuple> {
   static final String PARQUET_PIG_SCHEMA = "parquet.pig.schema";
+  static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.private.pig.column.index.access";
+  static final String PARQUET_PIG_REQUIRED_FIELDS = "parquet.private.pig.required.fields";
   static final String PARQUET_PIG_ELEPHANT_BIRD_COMPATIBLE = "parquet.pig.elephantbird.compatible";
   private static final Log LOG = Log.getLog(TupleReadSupport.class);
 
-  private static final PigSchemaConverter pigSchemaConverter = new PigSchemaConverter();
+  private static final PigSchemaConverter pigSchemaConverter = new PigSchemaConverter(false);
 
   /**
    * @param configuration the configuration for the current job
@@ -60,6 +64,24 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
   }
 
   /**
+   * @param configuration configuration for the current job
+   * @return List of required fields from pushProjection
+   */
+  static RequiredFieldList getRequiredFields(Configuration configuration) {
+    String requiredFieldString = configuration.get(PARQUET_PIG_REQUIRED_FIELDS);
+    
+    if(requiredFieldString == null) {
+      return null;
+    }
+    
+    try {
+      return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
+    } catch (IOException iOException) {
+      throw new RuntimeException("Failed to deserialize pushProjection");
+    }
+  }
+  
+  /**
    * @param fileSchema the parquet schema from the file
    * @param keyValueMetaData the extra meta data from the files
    * @return the pig schema according to the file
@@ -131,15 +153,16 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
 
   @Override
   public ReadContext init(InitContext initContext) {
-    Schema requestedPigSchema = getPigSchema(initContext.getConfiguration());
-    if (requestedPigSchema == null) {
+    Schema pigSchema = getPigSchema(initContext.getConfiguration());
+    RequiredFieldList requiredFields = getRequiredFields(initContext.getConfiguration());
+    boolean columnIndexAccess = initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
+    
+    if (pigSchema == null) {
       return new ReadContext(initContext.getFileSchema());
     } else {
+      
       // project the file schema according to the requested Pig schema
-      MessageType parquetRequestedSchema =
-          pigSchemaConverter.filter(
-          initContext.getFileSchema(),
-          requestedPigSchema);
+      MessageType parquetRequestedSchema = new PigSchemaConverter(columnIndexAccess).filter(initContext.getFileSchema(), pigSchema, requiredFields);;
       return new ReadContext(parquetRequestedSchema);
     }
   }
@@ -152,14 +175,16 @@ public class TupleReadSupport extends ReadSupport<Tuple> {
       ReadContext readContext) {
     MessageType requestedSchema = readContext.getRequestedSchema();
     Schema requestedPigSchema = getPigSchema(configuration);
+    
     if (requestedPigSchema == null) {
       throw new ParquetDecodingException("Missing Pig schema: ParquetLoader sets the schema in the job conf");
     }
     boolean elephantBirdCompatible = configuration.getBoolean(PARQUET_PIG_ELEPHANT_BIRD_COMPATIBLE, false);
+    boolean columnIndexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
     if (elephantBirdCompatible) {
       LOG.info("Numbers will default to 0 instead of NULL; Boolean will be converted to Int");
     }
-    return new TupleRecordMaterializer(requestedSchema, requestedPigSchema, elephantBirdCompatible);
+    return new TupleRecordMaterializer(requestedSchema, requestedPigSchema, elephantBirdCompatible, columnIndexAccess);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java b/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java
index 54de927..8822869 100644
--- a/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java
+++ b/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java
@@ -45,7 +45,7 @@ import parquet.schema.Type;
 
 public class TupleWriteSupport extends WriteSupport<Tuple> {
   private static final TupleFactory TF = TupleFactory.getInstance();
-  private static final PigSchemaConverter pigSchemaConverter = new PigSchemaConverter();
+  private static final PigSchemaConverter pigSchemaConverter = new PigSchemaConverter(false);
 
   public static TupleWriteSupport fromPigSchema(String pigSchemaString) throws ParserException {
     return new TupleWriteSupport(Utils.getSchemaFromString(pigSchemaString));

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java b/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java
index 014ebe3..2827239 100644
--- a/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java
+++ b/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java
@@ -53,12 +53,12 @@ final class MapConverter extends GroupConverter {
   private Object currentKey;
   private Object currentValue;
 
-  MapConverter(GroupType parquetSchema, FieldSchema pigSchema, ParentValueContainer parent, boolean numbersDefaultToZero) throws FrontendException {
+  MapConverter(GroupType parquetSchema, FieldSchema pigSchema, ParentValueContainer parent, boolean numbersDefaultToZero, boolean columnIndexAccess) throws FrontendException {
     if (parquetSchema.getFieldCount() != 1) {
       throw new IllegalArgumentException("maps have only one field. " + parquetSchema);
     }
     this.parent = parent;
-    keyValue = new MapKeyValueConverter(parquetSchema.getType(0).asGroupType(), pigSchema.schema.getField(0), numbersDefaultToZero);
+    keyValue = new MapKeyValueConverter(parquetSchema.getType(0).asGroupType(), pigSchema.schema.getField(0), numbersDefaultToZero, columnIndexAccess);
   }
 
   @Override
@@ -129,7 +129,7 @@ final class MapConverter extends GroupConverter {
     private final Converter keyConverter;
     private final Converter valueConverter;
 
-    MapKeyValueConverter(GroupType parquetSchema, Schema.FieldSchema pigSchema, boolean numbersDefaultToZero) {
+    MapKeyValueConverter(GroupType parquetSchema, Schema.FieldSchema pigSchema, boolean numbersDefaultToZero, boolean columnIndexAccess) {
       if (parquetSchema.getFieldCount() != 2
           || !parquetSchema.getType(0).getName().equals("key")
           || !parquetSchema.getType(1).getName().equals("value")) {
@@ -141,7 +141,7 @@ final class MapConverter extends GroupConverter {
         void add(Object value) {
           currentKey = value;
         }
-      }, numbersDefaultToZero);
+      }, numbersDefaultToZero, columnIndexAccess);
       } catch (FrontendException fe) {
         throw new SchemaConversionException("can't convert keytype "+ parquetSchema.getType(0), fe);
       }
@@ -149,7 +149,7 @@ final class MapConverter extends GroupConverter {
         void add(Object value) {
           currentValue = value;
         }
-      }, numbersDefaultToZero);
+      }, numbersDefaultToZero, columnIndexAccess);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java b/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java
index eb51b5f..bea6458 100644
--- a/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java
+++ b/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java
@@ -62,7 +62,7 @@ public class TupleConverter extends GroupConverter {
 
   private final boolean elephantBirdCompatible;
 
-  public TupleConverter(GroupType parquetSchema, Schema pigSchema, boolean elephantBirdCompatible) {
+  public TupleConverter(GroupType parquetSchema, Schema pigSchema, boolean elephantBirdCompatible, boolean columnIndexAccess) {
     this.parquetSchema = parquetSchema;
     this.elephantBirdCompatible = elephantBirdCompatible;
     try {
@@ -70,17 +70,18 @@ public class TupleConverter extends GroupConverter {
       this.converters = new Converter[this.schemaSize];
       for (int i = 0, c = 0; i < schemaSize; i++) {
         FieldSchema field = pigSchema.getField(i);
-        Type type = null;
-        if(parquetSchema.containsField(field.alias)) {
-            type = parquetSchema.getType(parquetSchema.getFieldIndex(field.alias));
-            
+        if(parquetSchema.containsField(field.alias) || columnIndexAccess) {
+          Type type = getType(columnIndexAccess, field.alias, i);
+          
+          if(type != null) {
             final int index = i;
             converters[c++] = newConverter(field, type, new ParentValueContainer() {
               @Override
               void add(Object value) {
                 TupleConverter.this.set(index, value);
               }
-            }, elephantBirdCompatible);
+            }, elephantBirdCompatible, columnIndexAccess);
+          }
         }
         
       }
@@ -89,15 +90,27 @@ public class TupleConverter extends GroupConverter {
     }
   }
 
-  static Converter newConverter(FieldSchema pigField, Type type, final ParentValueContainer parent, boolean elephantBirdCompatible) {
+  private Type getType(boolean columnIndexAccess, String alias, int index) {
+    if(columnIndexAccess) {
+      if(index < parquetSchema.getFieldCount()) {
+        return parquetSchema.getType(index);
+      }
+    } else {
+      return parquetSchema.getType(parquetSchema.getFieldIndex(alias));
+    }
+    
+    return null;
+  }
+  
+  static Converter newConverter(FieldSchema pigField, Type type, final ParentValueContainer parent, boolean elephantBirdCompatible, boolean columnIndexAccess) {
     try {
       switch (pigField.type) {
       case DataType.BAG:
-        return new BagConverter(type.asGroupType(), pigField, parent, elephantBirdCompatible);
+        return new BagConverter(type.asGroupType(), pigField, parent, elephantBirdCompatible, columnIndexAccess);
       case DataType.MAP:
-        return new MapConverter(type.asGroupType(), pigField, parent, elephantBirdCompatible);
+        return new MapConverter(type.asGroupType(), pigField, parent, elephantBirdCompatible, columnIndexAccess);
       case DataType.TUPLE:
-        return new TupleConverter(type.asGroupType(), pigField.schema, elephantBirdCompatible) {
+        return new TupleConverter(type.asGroupType(), pigField.schema, elephantBirdCompatible, columnIndexAccess) {
           @Override
           public void end() {
             super.end();
@@ -525,7 +538,7 @@ public class TupleConverter extends GroupConverter {
     private final Converter child;
     private final ParentValueContainer parent;
 
-    BagConverter(GroupType parquetSchema, FieldSchema pigSchema, ParentValueContainer parent, boolean numbersDefaultToZero) throws FrontendException {
+    BagConverter(GroupType parquetSchema, FieldSchema pigSchema, ParentValueContainer parent, boolean numbersDefaultToZero, boolean columnIndexAccess) throws FrontendException {
       this.parent = parent;
       if (parquetSchema.getFieldCount() != 1) {
         throw new IllegalArgumentException("bags have only one field. " + parquetSchema + " size = " + parquetSchema.getFieldCount());
@@ -551,7 +564,7 @@ public class TupleConverter extends GroupConverter {
           }};
         pigField = pigSchema.schema.getField(0);
       }
-      child = newConverter(pigField, nestedType, childsParent, numbersDefaultToZero);
+      child = newConverter(pigField, nestedType, childsParent, numbersDefaultToZero, columnIndexAccess);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/main/java/parquet/pig/convert/TupleRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/convert/TupleRecordMaterializer.java b/parquet-pig/src/main/java/parquet/pig/convert/TupleRecordMaterializer.java
index 1715162..2dedea1 100644
--- a/parquet-pig/src/main/java/parquet/pig/convert/TupleRecordMaterializer.java
+++ b/parquet-pig/src/main/java/parquet/pig/convert/TupleRecordMaterializer.java
@@ -27,7 +27,11 @@ public class TupleRecordMaterializer extends RecordMaterializer<Tuple> {
   private TupleConverter root;
 
   public TupleRecordMaterializer(GroupType parquetSchema, Schema pigSchema, boolean numbersDefaultToZero) {
-    this.root = new TupleConverter(parquetSchema, pigSchema, numbersDefaultToZero);
+    this(parquetSchema, pigSchema, numbersDefaultToZero, false);
+  }
+  
+  public TupleRecordMaterializer(GroupType parquetSchema, Schema pigSchema, boolean numbersDefaultToZero, boolean columnIndexAccess) {
+    this.root = new TupleConverter(parquetSchema, pigSchema, numbersDefaultToZero, columnIndexAccess);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java b/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
index e5c08af..e8bf9a3 100644
--- a/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
+++ b/parquet-pig/src/test/java/parquet/pig/TestParquetLoader.java
@@ -251,6 +251,80 @@ public class TestParquetLoader {
   }
   
   @Test
+  public void testColumnIndexAccess() throws Exception {
+    PigServer pigServer = new PigServer(ExecType.LOCAL); 
+    pigServer.setValidateEachStatement(true);
+    String out = "target/out";
+    int rows = 10;
+    Data data = Storage.resetData(pigServer);
+    List<Tuple> list = new ArrayList<Tuple>();
+    for (int i = 0; i < rows; i++) {
+      list.add(Storage.tuple(i, i*1.0, i*2L, "v"+i));
+    }
+    data.set("in", "c1:int, c2:double, c3:long, c4:chararray", list);
+    pigServer.setBatchOn();
+    pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+    pigServer.deleteFile(out);
+    pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();");
+    pigServer.executeBatch();
+      
+    //Test Null Padding at the end 
+    pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+"('n1:int, n2:double, n3:long, n4:chararray', 'true');");
+    pigServer.registerQuery("STORE B into 'out' using mock.Storage();");
+    pigServer.executeBatch();
+    
+    List<Tuple> actualList = data.get("out");
+    
+    assertEquals(rows, actualList.size());
+    for(int i = 0; i < rows; i++) {
+      Tuple t = actualList.get(i);
+      
+      assertEquals(4, t.size());
+      
+      assertEquals(i, t.get(0));
+      assertEquals(i*1.0, t.get(1));
+      assertEquals(i*2L, t.get(2));
+      assertEquals("v"+i, t.get(3));
+    }
+  }
+  
+  @Test
+  public void testColumnIndexAccessProjection() throws Exception {
+    PigServer pigServer = new PigServer(ExecType.LOCAL); 
+    pigServer.setValidateEachStatement(true);
+    String out = "target/out";
+    int rows = 10;
+    Data data = Storage.resetData(pigServer);
+    List<Tuple> list = new ArrayList<Tuple>();
+    for (int i = 0; i < rows; i++) {
+      list.add(Storage.tuple(i, i*1.0, i*2L, "v"+i));
+    }
+    data.set("in", "c1:int, c2:double, c3:long, c4:chararray", list);
+    pigServer.setBatchOn();
+    pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+    pigServer.deleteFile(out);
+    pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();");
+    pigServer.executeBatch();
+    
+    pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+"('n1:int, n2:double, n3:long, n4:chararray', 'true');");
+    pigServer.registerQuery("C = foreach B generate n1, n3;");
+    pigServer.registerQuery("STORE C into 'out' using mock.Storage();");
+    pigServer.executeBatch();
+    
+    List<Tuple> actualList = data.get("out");
+    
+    assertEquals(rows, actualList.size());
+    for(int i = 0; i < rows; i++) {
+      Tuple t = actualList.get(i);
+      
+      assertEquals(2, t.size());
+      
+      assertEquals(i, t.get(0));
+      assertEquals(i*2L, t.get(1));
+    }
+  }  
+  
+  @Test
   public void testRead() {
     
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/17864dfc/parquet-pig/src/test/java/parquet/pig/TestPigSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/parquet/pig/TestPigSchemaConverter.java b/parquet-pig/src/test/java/parquet/pig/TestPigSchemaConverter.java
index 3cd7977..e95150f 100644
--- a/parquet-pig/src/test/java/parquet/pig/TestPigSchemaConverter.java
+++ b/parquet-pig/src/test/java/parquet/pig/TestPigSchemaConverter.java
@@ -75,7 +75,7 @@ public class TestPigSchemaConverter {
     MessageType expectedMT = MessageTypeParser.parseMessageType(schemaString);
     assertEquals("converting "+pigSchemaString+" to "+schemaString, expectedMT, schema);
 
-    MessageType filtered = pigSchemaConverter.filter(schema, pigSchema);
+    MessageType filtered = pigSchemaConverter.filter(schema, pigSchema, null);
     assertEquals("converting "+pigSchemaString+" to "+schemaString+" and filtering", schema.toString(), filtered.toString());
   }