You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2010/03/01 23:51:37 UTC

[Pig Wiki] Update of "LoadStoreRedesignProposal" by PradeepKamath

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The "LoadStoreRedesignProposal" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diff&rev1=40&rev2=41

--------------------------------------------------

  
  For load functions:
  
-  * '''!LoadFunc''' will be pared down to just contain functions directly associated with reading data, such as getNext.
+  * '''!LoadFunc''' will be pared down to just contain functions directly associated with reading data, such as getNext. '''!LoadFunc''' will now be an abstract class with default implementations for some of the methods.
   * A new '''!LoadCaster''' interface will be added.  This interface will contain all of the bytesToX methods currently in !LoadFunc.  !LoadFunc will add a `getCaster` routine, that will return an object that can provide casts.  The existing UTF8!StorageConverter class will change to implement this interface.  Load functions will then be free to use this class as their caster, or provide their own.  For existing load functions that provide all of the bytesToX methods, they can implement the !LoadCaster interface and return themselves from the getCaster routine.  If a loader does not provide a !LoadCaster, casts from byte array to other Pig types will not be supported for data loaded via that loader.
   * A new '''!LoadMetadata''' interface will be added.  Calls that find metadata about the data being loaded, such as determineSchema, will be placed in this interface.  If a loader does not implement this interface, then Pig will assume that no metadata is obtainable for this data.
   * A new '''!LoadPushDown''' interface will be added.  Calls that determine what can be pushed down and pushing that functionality down into the loader will be placed in this interface.  If a loader does not implement this interface, then Pig will assume that the loader is not capable of pushing down any functionality.
@@ -29, +29 @@

   * A new interface '''!StoreMetadata''' will be added to provide a way for storage functions to record metadata.  If a given storage function does not implement this interface Pig will assume that it is unable to record metadata.
  
  === Details ===
- '''!LoadFunc'''
+ '''!LoadFunc''' 
- 
+ (For brevity, getAbsolutePath() method's implementation is not shown below)
  {{{
+ 
  /**
-  * This interface is used to implement functions to parse records
-  * from a dataset.
+  * <code>LoadFunc</code> provides functions directly associated with reading 
+  * records from data set.
   */
- public interface LoadFunc {
+ public abstract class LoadFunc {
+     
      /**
       * This method is called by the Pig runtime in the front end to convert the
       * input location to an absolute path if the location is relative. The
-      * loadFunc implementation is free to choose how it converts a relative
+      * loadFunc implementation is free to choose how it converts a relative 
       * location to an absolute location since this may depend on what the location
       * string represent (hdfs path or some other data source)
-      *
+      * 
       * @param location location as provided in the "load" statement of the script
       * @param curDir the current working direction based on any "cd" statements
       * in the script before the "load" statement. If there are no "cd" statements
-      * in the script, this would be the home directory -
+      * in the script, this would be the home directory - 
       * <pre>/user/<username> </pre>
       * @return the absolute location based on the arguments passed
       * @throws IOException if the conversion is not possible
       */
-     String relativeToAbsolutePath(String location, Path curDir) throws IOException;
+     public String relativeToAbsolutePath(String location, Path curDir) 
+             throws IOException {      
+         return getAbsolutePath(location, curDir);
+     }    
  
- 
      /**
-      * Communicate to the loader the location of the object(s) being loaded.
+      * Communicate to the loader the location of the object(s) being loaded.  
-      * The location string passed to the LoadFunc here is the return value of
+      * The location string passed to the LoadFunc here is the return value of 
-      * {@link LoadFunc#relativeToAbsolutePath(String, String)}
+      * {@link LoadFunc#relativeToAbsolutePath(String, Path)}. Implementations
+      * should use this method to communicate the location (and any other information)
+      * to its underlying InputFormat through the Job object.
-      *
+      * 
       * This method will be called in the backend multiple times. Implementations
       * should bear in mind that this method is called multiple times and should
       * ensure there are no inconsistent side effects due to the multiple calls.
-      *
+      * 
-      * @param location Location as returned by
+      * @param location Location as returned by 
-      * {@link LoadFunc#relativeToAbsolutePath(String, String)}.
+      * {@link LoadFunc#relativeToAbsolutePath(String, Path)}
       * @param job the {@link Job} object
+      * store or retrieve earlier stored information from the {@link UDFContext}
       * @throws IOException if the location is not valid.
       */
-     void setLocation(String location, Job job) throws IOException;
+     public abstract void setLocation(String location, Job job) throws IOException;
- 
+     
      /**
       * This will be called during planning on the front end. This is the
-      * instance of InputFormat (rather than the class name) because the
+      * instance of InputFormat (rather than the class name) because the 
-      * load function may need to instantiate the InputFormat in order
+      * load function may need to instantiate the InputFormat in order 
       * to control how it is constructed.
       * @return the InputFormat associated with this loader.
-      * @throws IOException if there is an exception during InputFormat
+      * @throws IOException if there is an exception during InputFormat 
       * construction
       */
+     @SuppressWarnings("unchecked")
-     InputFormat getInputFormat() throws IOException;
+     public abstract InputFormat getInputFormat() throws IOException;
  
      /**
-      * This will be called on the front end during planning and not on the back
+      * This will be called on the front end during planning and not on the back 
       * end during execution.
-      * @return the {@link LoadCaster} associated with this loader. Returning null
+      * @return the {@link LoadCaster} associated with this loader. Returning null 
-      * indicates that casts from byte array are not supported for this loader.
+      * indicates that casts from byte array are not supported for this loader. 
       * construction
-      * @throws IOException if there is an exception during LoadCaster
+      * @throws IOException if there is an exception during LoadCaster 
       */
-     LoadCaster getLoadCaster() throws IOException;
+     public LoadCaster getLoadCaster() throws IOException {
+         return new Utf8StorageConverter();
+     }
  
      /**
       * Initializes LoadFunc for reading data.  This will be called during execution
@@ -100, +110 @@

       * @param split The input {@link PigSplit} to process
       * @throws IOException if there is an exception during initialization
       */
+     @SuppressWarnings("unchecked")
-     void prepareToRead(RecordReader reader, PigSplit split) throws IOException;
+     public abstract void prepareToRead(RecordReader reader, PigSplit split) throws IOException;
  
      /**
-      * Retrieves the next tuple to be processed.
+      * Retrieves the next tuple to be processed. Implementations should NOT reuse
+      * tuple objects (or inner member objects) they return across calls and 
+      * should return a different tuple object in each call.
       * @return the next tuple to be processed or null if there are no more tuples
       * to be processed.
       * @throws IOException if there is an exception while retrieving the next
       * tuple
       */
-     Tuple getNext() throws IOException;
+     public abstract Tuple getNext() throws IOException;
  
+     /**
+      * This method will be called by Pig both in the front end and back end to
+      * pass a unique signature to the {@link LoadFunc}. The signature can be used
+      * to store into the {@link UDFContext} any information which the 
+      * {@link LoadFunc} needs to store between various method invocations in the
+      * front end and back end. A use case is to store {@link RequiredFieldList} 
+      * passed to it in {@link LoadPushDown#pushProjection(RequiredFieldList)} for
+      * use in the back end before returning tuples in {@link LoadFunc#getNext()}.
+      * This method will be call before other methods in {@link LoadFunc}
+      * @param signature a unique signature to identify this LoadFunc
+      */
+     public void setUDFContextSignature(String signature) {
+         // default implementation is a no-op
+     }
+        
  }
- }}}
- The '''!LoadCaster''' interface will include bytesToInt, bytesToLong, etc. functions currently in !LoadFunc.  UTF8!StorageConverter will implement this interface.
  
- '''Open Question''': Should the methods to convert to a Bag, Tuple and Map take a Schema (ResourceSchema?) argument?
+ 
+ }}}
+ The '''!LoadCaster''' interface will include bytesToInt, bytesToLong, etc. functions currently in !LoadFunc.  UTF8!StorageConverter will implement this interface. The only change will be in bytesToTuple() and bytesToBag() - these methods will take an additional !ResourceFieldSchema argument describing the schema of the tuple and bag respectively that the bytes need to be cast to.
  
  '''!LoadMetadata'''
  
@@ -198, +226 @@

          public Order[] sortKeyOrders;
      }
  }}}
- Feedback from Pradeep:  We must fix the two level access issues with schema of bags in current schema before we make these changes, otherwise that same contagion will afflict us here.
+ 
+ '''IMPORTANT NOTE:''': In the !ResourceFieldSchema for a bag field, the only field allowed in the subschema is a tuple field. The tuple itself can have a schema with more than 1 fields.
  
  '''!ResourceStatistics'''
  
@@ -239, +268 @@

   * be pushed into the loader.  If a given loader does not implement this interface
   * it will be assumed that it is unable to accept any functionality for push down.
   */
- interface LoadPushDown {
+ public interface LoadPushDown {
  
      /**
-      * Set of possible operations that Pig can push down to a loader.
+      * Set of possible operations that Pig can push down to a loader. 
       */
-     enum OperatorSet {PROJECTION, SELECTION};
+     enum OperatorSet {PROJECTION};
  
      /**
-      * Determine the operators that can be pushed to the loader.
+      * Determine the operators that can be pushed to the loader.  
       * Note that by indicating a loader can accept a certain operator
       * (such as selection) the loader is not promising that it can handle
-      * all selections.  When it is passed the actual operators to
+      * all selections.  When it is passed the actual operators to 
       * push down it will still have a chance to reject them.
       * @return list of all features that the loader can support
       */
      List<OperatorSet> getFeatures();
  
      /**
-      * Propose a set of operators to push to the loader.
-      * @param plan Plan containing proposed operators
-      * @return true if the loader accepts the plan, false if not.
-      * If false is returned Pig may choose to trim the plan and call
-      * this method again.
+      * Indicate to the loader fields that will be needed.  This can be useful for
+      * loaders that access data that is stored in a columnar format where indicating
+      * columns to be accessed a head of time will save scans.  This method will
+      * not be invoked by the Pig runtime if all fields are required. So implementations
+      * should assume that if this method is not invoked, then all fields from 
+      * the input are required. If the loader function cannot make use of this 
+      * information, it is free to ignore it by returning an appropriate Response
+      * @param requiredFieldList RequiredFieldList indicating which columns will be needed.
+      */
+     public RequiredFieldResponse pushProjection(RequiredFieldList 
+             requiredFieldList) throws FrontendException;
+     
+     public static class RequiredField implements Serializable {
+         
+         private static final long serialVersionUID = 1L;
+         
+         // will hold name of the field (would be null if not supplied)
+         private String alias; 
+ 
+         // will hold the index (position) of the required field (would be -1 if not supplied), index is 0 based
+         private int index; 
+ 
+         // A list of sub fields in this field (this could be a list of hash keys for example). 
+         // This would be null if the entire field is required and no specific sub fields are required. 
+         // In the initial implementation only one level of subfields will be populated.
+         private List<RequiredField> subFields;
+         
+         // Type of this field - the value could be any current PIG DataType (as specified by the constants in DataType class).
+         private byte type;
+ 
+         public RequiredField() {
+             // to allow piece-meal construction
+         }
+         
+         /**
+          * @param alias
+          * @param index
+          * @param subFields
+          * @param type
-      */
+          */
-     boolean pushOperators(OperatorPlan plan);
+         public RequiredField(String alias, int index,
+                 List<RequiredField> subFields, byte type) {
+             this.alias = alias;
+             this.index = index;
+             this.subFields = subFields;
+             this.type = type;
+         }
+ 
+         /**
+          * @return the alias
+          */
+         public String getAlias() {
+             return alias;
+         }
+ 
+         /**
+          * @return the index
+          */
+         public int getIndex() {
+             return index;
+         }
+ 
+         
+         /**
+          * @return the required sub fields. The return value is null if all
+          *         subfields are required
+          */
+         public List<RequiredField> getSubFields() {
+             return subFields;
+         }
+         
+         public void setSubFields(List<RequiredField> subFields)
+         {
+             this.subFields = subFields;
+         }
+ 
+         /**
+          * @return the type
+          */
+         public byte getType() {
+             return type;
+         }
+ 
+         public void setType(byte t) {
+             type = t;
+         }
+         
+         public void setIndex(int i) {
+             index = i;
+         }
+         
+         public void setAlias(String alias)
+         {
+             this.alias = alias;
+         }
+ 
+     }
+ 
+     public static class RequiredFieldList implements Serializable {
+         
+         private static final long serialVersionUID = 1L;
+         
+         // list of Required fields, this will be null if all fields are required
+         private List<RequiredField> fields = new ArrayList<RequiredField>(); 
+         
+         /**
+          * @param fields
+          */
+         public RequiredFieldList(List<RequiredField> fields) {
+             this.fields = fields;
+         }
+ 
+         /**
+          * @return the required fields - this will be null if all fields are
+          *         required
+          */
+         public List<RequiredField> getFields() {
+             return fields;
+         }
+ 
+         public RequiredFieldList() {
+         }
+         
+         public void add(RequiredField rf)
+         {
+             fields.add(rf);
+         }
+     }
+ 
+     public static class RequiredFieldResponse {
+         // the loader should pass true if it will return data containing
+         // only the List of RequiredFields in that order. false if it
+         // will return all fields in the data
+         private boolean requiredFieldRequestHonored;
+ 
+         public RequiredFieldResponse(boolean requiredFieldRequestHonored) {
+             this.requiredFieldRequestHonored = requiredFieldRequestHonored;
+         }
+ 
+         // true if the loader will return data containing only the List of
+         // RequiredFields in that order. false if the loader will return all
+         // fields in the data
+         public boolean getRequiredFieldResponse() {
+             return requiredFieldRequestHonored;
+         }
+ 
+         // the loader should pass true if the it will return data containing
+         // only the List of RequiredFields in that order. false if the it
+         // will return all fields in the data
+         public void setRequiredFieldResponse(boolean honored) {
+             requiredFieldRequestHonored = honored;
+         }
+     }
+ 
+     
  }
  }}}
- An open question for !LoadPushdown is how do we communicate what needs to be pushed down?  In the above, !OperatorPlan is envisioned as a simple syntax tree that would be sufficient to communicate operators and their expressions to the storage functions.  Initially we considered using the !LogicalPlan as is.  But our !LogicalPlan is a mess right now.  It's also tightly tied to our implementation, so exposing it as an interface is unwise. We could use a SQL like string, but then loaders have to implement a parser.
  
  '''!StoreFunc'''
  
@@ -681, +857 @@

  
   * Updated section on streaming to reflect the current implementation.
  
+ Mar 1 2010, Pradeep Kamath
+  * Updated interfaces/abstract classes to reflect current state
+