You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Vincent BARAT <vi...@ubikod.com> on 2009/09/14 15:38:23 UTC

Issue with LoadFunc & Slicer

Hello,

In the process of to trying to add the support for HBase 0.20.0 in 
PIG (trunk) I was trying the tutorial from PIG documentation:

http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer

Unfortunately, when I try:

A = LOAD '27' USING RangeSlicer();
dump A;

PIG reports the following error:

2009-09-14 15:33:46,395 [main] ERROR 
org.apache.pig.tools.grunt.Grunt - ERROR 2081: Unable to setup the 
load function.

If I provide an existing file, instead of '27', I no longer have 
this error, but the output of the dump function is empty.

Any idea ?


Here is my RangeSlicer() code:

=========================================================


package com.ubikod.ermin.backend.pigudfs;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class RangeSlicer extends Utf8StorageConverter implements Slicer,
   LoadFunc
{
   private static final Log LOG = LogFactory.getLog(RangeSlicer.class);

   public RangeSlicer()
   {
     LOG.info("RangeSlicer");
   }

   /**
    * Expects location to be a Stringified integer, and makes
    * Integer.parseInt(location) slices. Each slice generates a 
single value, its
    * index in the sequence of slices.
    */
   public Slice[] slice(DataStorage store, String location) throws 
IOException
   {
     LOG.info("slice #################" + location);
     location = "30";
     // Note: validate has already made sure that location is an integer
     int numslices = Integer.parseInt(location);
     LOG.info("slice #################" + numslices);
     Slice[] slices = new Slice[numslices];
     for (int i = 0; i < slices.length; i++)
     {
       slices[i] = new SingleValueSlice(i);
     }
     return slices;
   }

   public void validate(DataStorage store, String location) throws 
IOException
   {
     try
     {
       LOG.info("validate #################" + location);
       Integer.parseInt("30");
       LOG.info("validate #################" + location);
     }
     catch (NumberFormatException nfe)
     {
       throw new IOException(nfe.getMessage());
     }
   }

   /**
    * A Slice that returns a single value from next.
    */
   public static class SingleValueSlice implements Slice
   {
     // note this value is set by the Slicer and will get serialized and
     // deserialized at the remote processing node
     public int val;
     // since we just have a single value, we can use a boolean 
rather than a
     // counter
     private transient boolean read;

     public SingleValueSlice(int value)
     {
       LOG.info("SingleValueSlice #################" + value);

       this.val = value;
     }

     public void close() throws IOException
     {
     }

     public long getLength()
     {
       return 1;
     }

     public String[] getLocations()
     {
       return new String[0];
     }

     public long getStart()
     {
       return 0;
     }

     public long getPos() throws IOException
     {
       return read ? 1 : 0;
     }

     public float getProgress() throws IOException
     {
       return read ? 1 : 0;
     }

     public void init(DataStorage store) throws IOException
     {
     }

     public boolean next(Tuple value) throws IOException
     {
       if (!read)
       {
         LOG.info("next #################" + value);

         value.append(val);
         read = true;
         return true;
       }
       return false;
     }

     private static final long serialVersionUID = 1L;
   }

   @Override
   public void bindTo(String arg0, BufferedPositionedInputStream arg1,
     long arg2, long arg3) throws IOException
   {
     LOG.info("bindTo #################" + arg0);
   }

   @Override
   public Schema determineSchema(String arg0, ExecType arg1, 
DataStorage arg2)
     throws IOException
   {
     // TODO Auto-generated method stub
     return null;
   }

   @Override
   public void fieldsToRead(Schema arg0)
   {
     // TODO Auto-generated method stub
   }

   @Override
   public Tuple getNext() throws IOException
   {
     // TODO Auto-generated method stub
     return null;
   }
}

Re: Issue with LoadFunc & Slicer

Posted by Alan Gates <ga...@yahoo-inc.com>.
Kevin,

Please take a look at the proposal for reworking load and store  
functions that was posted a couple of days ago and see if it will  
address your issues with plugability of load functions.

http://wiki.apache.org/pig/LoadStoreRedesignProposal

Alan.

On Sep 14, 2009, at 8:58 AM, Kevin Weil wrote:

> +1 on this.  I'm writing a bunch of LZO-based LoadFuncs/Slicers  
> (commit
> coming soon) and it's so much faster to test/verify in local mode.
>
> Btw, I believe there's already a ticket somewhere for this, but  
> while I'm at
> it: +1 on further separation of the LoadFunc/pig output from the  
> Slicer/disk
> reads.  Right now there are some one-offs in the Pig code to deal with
> gzipped file input.  Writing loaders to deal with Lzo (with indexed  
> blocks
> that mean shifting the InputSplits slightly) has meant copying a  
> bunch of
> the logic for parsing tuples from PigStorage, etc, because the lower  
> layer
> isn't as pluggable as it could be.
>
> Kevin
>
> On Mon, Sep 14, 2009 at 8:35 AM, Dmitriy Ryaboy <dv...@gmail.com>  
> wrote:
>
>> There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612
>>
>> Vote it up so that the pig developers have a record of user interest
>> in this feature.
>>
>> -D
>>
>> On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
>> <vi...@ubikod.com> wrote:
>>> It seems that I got my answer: custom loader functions can only be  
>>> used
>> in
>>> map reduce mode, not local mode: in local mode, the file specified  
>>> must
>> be a
>>> real file.
>>>
>>> Vincent BARAT a écrit :
>>>>
>>>> Hello,
>>>>
>>>> In the process of to trying to add the support for HBase 0.20.0  
>>>> in PIG
>>>> (trunk) I was trying the tutorial from PIG documentation:
>>>>
>>>> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
>>>>
>>>> Unfortunately, when I try:
>>>>
>>>> A = LOAD '27' USING RangeSlicer();
>>>> dump A;
>>>>
>>>> PIG reports the following error:
>>>>
>>>> 2009-09-14 15:33:46,395 [main] ERROR  
>>>> org.apache.pig.tools.grunt.Grunt -
>>>> ERROR 2081: Unable to setup the load function.
>>>>
>>>> If I provide an existing file, instead of '27', I no longer have  
>>>> this
>>>> error, but the output of the dump function is empty.
>>>>
>>>> Any idea ?
>>>>
>>>>
>>>> Here is my RangeSlicer() code:
>>>>
>>>> =========================================================
>>>>
>>>>
>>>> package com.ubikod.ermin.backend.pigudfs;
>>>>
>>>> import java.io.IOException;
>>>>
>>>> import org.apache.commons.logging.Log;
>>>> import org.apache.commons.logging.LogFactory;
>>>> import org.apache.pig.ExecType;
>>>> import org.apache.pig.LoadFunc;
>>>> import org.apache.pig.Slice;
>>>> import org.apache.pig.Slicer;
>>>> import org.apache.pig.backend.datastorage.DataStorage;
>>>> import org.apache.pig.builtin.Utf8StorageConverter;
>>>> import org.apache.pig.data.Tuple;
>>>> import org.apache.pig.impl.io.BufferedPositionedInputStream;
>>>> import org.apache.pig.impl.logicalLayer.schema.Schema;
>>>>
>>>> public class RangeSlicer extends Utf8StorageConverter implements  
>>>> Slicer,
>>>> LoadFunc
>>>> {
>>>> private static final Log LOG =  
>>>> LogFactory.getLog(RangeSlicer.class);
>>>>
>>>> public RangeSlicer()
>>>> {
>>>>   LOG.info("RangeSlicer");
>>>> }
>>>>
>>>> /**
>>>>  * Expects location to be a Stringified integer, and makes
>>>>  * Integer.parseInt(location) slices. Each slice generates a single
>>>> value, its
>>>>  * index in the sequence of slices.
>>>>  */
>>>> public Slice[] slice(DataStorage store, String location) throws
>>>> IOException
>>>> {
>>>>   LOG.info("slice #################" + location);
>>>>   location = "30";
>>>>   // Note: validate has already made sure that location is an  
>>>> integer
>>>>   int numslices = Integer.parseInt(location);
>>>>   LOG.info("slice #################" + numslices);
>>>>   Slice[] slices = new Slice[numslices];
>>>>   for (int i = 0; i < slices.length; i++)
>>>>   {
>>>>     slices[i] = new SingleValueSlice(i);
>>>>   }
>>>>   return slices;
>>>> }
>>>>
>>>> public void validate(DataStorage store, String location) throws
>>>> IOException
>>>> {
>>>>   try
>>>>   {
>>>>     LOG.info("validate #################" + location);
>>>>     Integer.parseInt("30");
>>>>     LOG.info("validate #################" + location);
>>>>   }
>>>>   catch (NumberFormatException nfe)
>>>>   {
>>>>     throw new IOException(nfe.getMessage());
>>>>   }
>>>> }
>>>>
>>>> /**
>>>>  * A Slice that returns a single value from next.
>>>>  */
>>>> public static class SingleValueSlice implements Slice
>>>> {
>>>>   // note this value is set by the Slicer and will get serialized  
>>>> and
>>>>   // deserialized at the remote processing node
>>>>   public int val;
>>>>   // since we just have a single value, we can use a boolean rather
>> than
>>>> a
>>>>   // counter
>>>>   private transient boolean read;
>>>>
>>>>   public SingleValueSlice(int value)
>>>>   {
>>>>     LOG.info("SingleValueSlice #################" + value);
>>>>
>>>>     this.val = value;
>>>>   }
>>>>
>>>>   public void close() throws IOException
>>>>   {
>>>>   }
>>>>
>>>>   public long getLength()
>>>>   {
>>>>     return 1;
>>>>   }
>>>>
>>>>   public String[] getLocations()
>>>>   {
>>>>     return new String[0];
>>>>   }
>>>>
>>>>   public long getStart()
>>>>   {
>>>>     return 0;
>>>>   }
>>>>
>>>>   public long getPos() throws IOException
>>>>   {
>>>>     return read ? 1 : 0;
>>>>   }
>>>>
>>>>   public float getProgress() throws IOException
>>>>   {
>>>>     return read ? 1 : 0;
>>>>   }
>>>>
>>>>   public void init(DataStorage store) throws IOException
>>>>   {
>>>>   }
>>>>
>>>>   public boolean next(Tuple value) throws IOException
>>>>   {
>>>>     if (!read)
>>>>     {
>>>>       LOG.info("next #################" + value);
>>>>
>>>>       value.append(val);
>>>>       read = true;
>>>>       return true;
>>>>     }
>>>>     return false;
>>>>   }
>>>>
>>>>   private static final long serialVersionUID = 1L;
>>>> }
>>>>
>>>> @Override
>>>> public void bindTo(String arg0, BufferedPositionedInputStream arg1,
>>>>   long arg2, long arg3) throws IOException
>>>> {
>>>>   LOG.info("bindTo #################" + arg0);
>>>> }
>>>>
>>>> @Override
>>>> public Schema determineSchema(String arg0, ExecType arg1,  
>>>> DataStorage
>>>> arg2)
>>>>   throws IOException
>>>> {
>>>>   // TODO Auto-generated method stub
>>>>   return null;
>>>> }
>>>>
>>>> @Override
>>>> public void fieldsToRead(Schema arg0)
>>>> {
>>>>   // TODO Auto-generated method stub
>>>> }
>>>>
>>>> @Override
>>>> public Tuple getNext() throws IOException
>>>> {
>>>>   // TODO Auto-generated method stub
>>>>   return null;
>>>> }
>>>> }
>>>
>>


Re: Issue with LoadFunc & Slicer

Posted by Kevin Weil <ke...@gmail.com>.
+1 on this.  I'm writing a bunch of LZO-based LoadFuncs/Slicers (commit
coming soon) and it's so much faster to test/verify in local mode.

Btw, I believe there's already a ticket somewhere for this, but while I'm at
it: +1 on further separation of the LoadFunc/pig output from the Slicer/disk
reads.  Right now there are some one-offs in the Pig code to deal with
gzipped file input.  Writing loaders to deal with Lzo (with indexed blocks
that mean shifting the InputSplits slightly) has meant copying a bunch of
the logic for parsing tuples from PigStorage, etc, because the lower layer
isn't as pluggable as it could be.

Kevin

On Mon, Sep 14, 2009 at 8:35 AM, Dmitriy Ryaboy <dv...@gmail.com> wrote:

> There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612
>
> Vote it up so that the pig developers have a record of user interest
> in this feature.
>
> -D
>
> On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
> <vi...@ubikod.com> wrote:
> > It seems that I got my answer: custom loader functions can only be used
> in
> > map reduce mode, not local mode: in local mode, the file specified must
> be a
> > real file.
> >
> > Vincent BARAT a écrit :
> >>
> >> Hello,
> >>
> >> In the process of to trying to add the support for HBase 0.20.0 in PIG
> >> (trunk) I was trying the tutorial from PIG documentation:
> >>
> >> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
> >>
> >> Unfortunately, when I try:
> >>
> >> A = LOAD '27' USING RangeSlicer();
> >> dump A;
> >>
> >> PIG reports the following error:
> >>
> >> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
> >> ERROR 2081: Unable to setup the load function.
> >>
> >> If I provide an existing file, instead of '27', I no longer have this
> >> error, but the output of the dump function is empty.
> >>
> >> Any idea ?
> >>
> >>
> >> Here is my RangeSlicer() code:
> >>
> >> =========================================================
> >>
> >>
> >> package com.ubikod.ermin.backend.pigudfs;
> >>
> >> import java.io.IOException;
> >>
> >> import org.apache.commons.logging.Log;
> >> import org.apache.commons.logging.LogFactory;
> >> import org.apache.pig.ExecType;
> >> import org.apache.pig.LoadFunc;
> >> import org.apache.pig.Slice;
> >> import org.apache.pig.Slicer;
> >> import org.apache.pig.backend.datastorage.DataStorage;
> >> import org.apache.pig.builtin.Utf8StorageConverter;
> >> import org.apache.pig.data.Tuple;
> >> import org.apache.pig.impl.io.BufferedPositionedInputStream;
> >> import org.apache.pig.impl.logicalLayer.schema.Schema;
> >>
> >> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
> >>  LoadFunc
> >> {
> >>  private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
> >>
> >>  public RangeSlicer()
> >>  {
> >>    LOG.info("RangeSlicer");
> >>  }
> >>
> >>  /**
> >>   * Expects location to be a Stringified integer, and makes
> >>   * Integer.parseInt(location) slices. Each slice generates a single
> >> value, its
> >>   * index in the sequence of slices.
> >>   */
> >>  public Slice[] slice(DataStorage store, String location) throws
> >> IOException
> >>  {
> >>    LOG.info("slice #################" + location);
> >>    location = "30";
> >>    // Note: validate has already made sure that location is an integer
> >>    int numslices = Integer.parseInt(location);
> >>    LOG.info("slice #################" + numslices);
> >>    Slice[] slices = new Slice[numslices];
> >>    for (int i = 0; i < slices.length; i++)
> >>    {
> >>      slices[i] = new SingleValueSlice(i);
> >>    }
> >>    return slices;
> >>  }
> >>
> >>  public void validate(DataStorage store, String location) throws
> >> IOException
> >>  {
> >>    try
> >>    {
> >>      LOG.info("validate #################" + location);
> >>      Integer.parseInt("30");
> >>      LOG.info("validate #################" + location);
> >>    }
> >>    catch (NumberFormatException nfe)
> >>    {
> >>      throw new IOException(nfe.getMessage());
> >>    }
> >>  }
> >>
> >>  /**
> >>   * A Slice that returns a single value from next.
> >>   */
> >>  public static class SingleValueSlice implements Slice
> >>  {
> >>    // note this value is set by the Slicer and will get serialized and
> >>    // deserialized at the remote processing node
> >>    public int val;
> >>    // since we just have a single value, we can use a boolean rather
> than
> >> a
> >>    // counter
> >>    private transient boolean read;
> >>
> >>    public SingleValueSlice(int value)
> >>    {
> >>      LOG.info("SingleValueSlice #################" + value);
> >>
> >>      this.val = value;
> >>    }
> >>
> >>    public void close() throws IOException
> >>    {
> >>    }
> >>
> >>    public long getLength()
> >>    {
> >>      return 1;
> >>    }
> >>
> >>    public String[] getLocations()
> >>    {
> >>      return new String[0];
> >>    }
> >>
> >>    public long getStart()
> >>    {
> >>      return 0;
> >>    }
> >>
> >>    public long getPos() throws IOException
> >>    {
> >>      return read ? 1 : 0;
> >>    }
> >>
> >>    public float getProgress() throws IOException
> >>    {
> >>      return read ? 1 : 0;
> >>    }
> >>
> >>    public void init(DataStorage store) throws IOException
> >>    {
> >>    }
> >>
> >>    public boolean next(Tuple value) throws IOException
> >>    {
> >>      if (!read)
> >>      {
> >>        LOG.info("next #################" + value);
> >>
> >>        value.append(val);
> >>        read = true;
> >>        return true;
> >>      }
> >>      return false;
> >>    }
> >>
> >>    private static final long serialVersionUID = 1L;
> >>  }
> >>
> >>  @Override
> >>  public void bindTo(String arg0, BufferedPositionedInputStream arg1,
> >>    long arg2, long arg3) throws IOException
> >>  {
> >>    LOG.info("bindTo #################" + arg0);
> >>  }
> >>
> >>  @Override
> >>  public Schema determineSchema(String arg0, ExecType arg1, DataStorage
> >> arg2)
> >>    throws IOException
> >>  {
> >>    // TODO Auto-generated method stub
> >>    return null;
> >>  }
> >>
> >>  @Override
> >>  public void fieldsToRead(Schema arg0)
> >>  {
> >>    // TODO Auto-generated method stub
> >>  }
> >>
> >>  @Override
> >>  public Tuple getNext() throws IOException
> >>  {
> >>    // TODO Auto-generated method stub
> >>    return null;
> >>  }
> >> }
> >
>

Re: Issue with LoadFunc & Slicer

Posted by Vincent BARAT <vi...@ubikod.com>.
Hummm... Or the captcha is buggy, or I'm getting blind: I cannot 
manage to signup to your JIRA !

Dmitriy Ryaboy a écrit :
> There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612
> 
> Vote it up so that the pig developers have a record of user interest
> in this feature.
> 
> -D
> 
> On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
> <vi...@ubikod.com> wrote:
>> It seems that I got my answer: custom loader functions can only be used in
>> map reduce mode, not local mode: in local mode, the file specified must be a
>> real file.
>>
>> Vincent BARAT a écrit :
>>> Hello,
>>>
>>> In the process of to trying to add the support for HBase 0.20.0 in PIG
>>> (trunk) I was trying the tutorial from PIG documentation:
>>>
>>> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
>>>
>>> Unfortunately, when I try:
>>>
>>> A = LOAD '27' USING RangeSlicer();
>>> dump A;
>>>
>>> PIG reports the following error:
>>>
>>> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
>>> ERROR 2081: Unable to setup the load function.
>>>
>>> If I provide an existing file, instead of '27', I no longer have this
>>> error, but the output of the dump function is empty.
>>>
>>> Any idea ?
>>>
>>>
>>> Here is my RangeSlicer() code:
>>>
>>> =========================================================
>>>
>>>
>>> package com.ubikod.ermin.backend.pigudfs;
>>>
>>> import java.io.IOException;
>>>
>>> import org.apache.commons.logging.Log;
>>> import org.apache.commons.logging.LogFactory;
>>> import org.apache.pig.ExecType;
>>> import org.apache.pig.LoadFunc;
>>> import org.apache.pig.Slice;
>>> import org.apache.pig.Slicer;
>>> import org.apache.pig.backend.datastorage.DataStorage;
>>> import org.apache.pig.builtin.Utf8StorageConverter;
>>> import org.apache.pig.data.Tuple;
>>> import org.apache.pig.impl.io.BufferedPositionedInputStream;
>>> import org.apache.pig.impl.logicalLayer.schema.Schema;
>>>
>>> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
>>>  LoadFunc
>>> {
>>>  private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
>>>
>>>  public RangeSlicer()
>>>  {
>>>    LOG.info("RangeSlicer");
>>>  }
>>>
>>>  /**
>>>   * Expects location to be a Stringified integer, and makes
>>>   * Integer.parseInt(location) slices. Each slice generates a single
>>> value, its
>>>   * index in the sequence of slices.
>>>   */
>>>  public Slice[] slice(DataStorage store, String location) throws
>>> IOException
>>>  {
>>>    LOG.info("slice #################" + location);
>>>    location = "30";
>>>    // Note: validate has already made sure that location is an integer
>>>    int numslices = Integer.parseInt(location);
>>>    LOG.info("slice #################" + numslices);
>>>    Slice[] slices = new Slice[numslices];
>>>    for (int i = 0; i < slices.length; i++)
>>>    {
>>>      slices[i] = new SingleValueSlice(i);
>>>    }
>>>    return slices;
>>>  }
>>>
>>>  public void validate(DataStorage store, String location) throws
>>> IOException
>>>  {
>>>    try
>>>    {
>>>      LOG.info("validate #################" + location);
>>>      Integer.parseInt("30");
>>>      LOG.info("validate #################" + location);
>>>    }
>>>    catch (NumberFormatException nfe)
>>>    {
>>>      throw new IOException(nfe.getMessage());
>>>    }
>>>  }
>>>
>>>  /**
>>>   * A Slice that returns a single value from next.
>>>   */
>>>  public static class SingleValueSlice implements Slice
>>>  {
>>>    // note this value is set by the Slicer and will get serialized and
>>>    // deserialized at the remote processing node
>>>    public int val;
>>>    // since we just have a single value, we can use a boolean rather than
>>> a
>>>    // counter
>>>    private transient boolean read;
>>>
>>>    public SingleValueSlice(int value)
>>>    {
>>>      LOG.info("SingleValueSlice #################" + value);
>>>
>>>      this.val = value;
>>>    }
>>>
>>>    public void close() throws IOException
>>>    {
>>>    }
>>>
>>>    public long getLength()
>>>    {
>>>      return 1;
>>>    }
>>>
>>>    public String[] getLocations()
>>>    {
>>>      return new String[0];
>>>    }
>>>
>>>    public long getStart()
>>>    {
>>>      return 0;
>>>    }
>>>
>>>    public long getPos() throws IOException
>>>    {
>>>      return read ? 1 : 0;
>>>    }
>>>
>>>    public float getProgress() throws IOException
>>>    {
>>>      return read ? 1 : 0;
>>>    }
>>>
>>>    public void init(DataStorage store) throws IOException
>>>    {
>>>    }
>>>
>>>    public boolean next(Tuple value) throws IOException
>>>    {
>>>      if (!read)
>>>      {
>>>        LOG.info("next #################" + value);
>>>
>>>        value.append(val);
>>>        read = true;
>>>        return true;
>>>      }
>>>      return false;
>>>    }
>>>
>>>    private static final long serialVersionUID = 1L;
>>>  }
>>>
>>>  @Override
>>>  public void bindTo(String arg0, BufferedPositionedInputStream arg1,
>>>    long arg2, long arg3) throws IOException
>>>  {
>>>    LOG.info("bindTo #################" + arg0);
>>>  }
>>>
>>>  @Override
>>>  public Schema determineSchema(String arg0, ExecType arg1, DataStorage
>>> arg2)
>>>    throws IOException
>>>  {
>>>    // TODO Auto-generated method stub
>>>    return null;
>>>  }
>>>
>>>  @Override
>>>  public void fieldsToRead(Schema arg0)
>>>  {
>>>    // TODO Auto-generated method stub
>>>  }
>>>
>>>  @Override
>>>  public Tuple getNext() throws IOException
>>>  {
>>>    // TODO Auto-generated method stub
>>>    return null;
>>>  }
>>> }
> 
> 

Re: Issue with LoadFunc & Slicer

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612

Vote it up so that the pig developers have a record of user interest
in this feature.

-D

On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
<vi...@ubikod.com> wrote:
> It seems that I got my answer: custom loader functions can only be used in
> map reduce mode, not local mode: in local mode, the file specified must be a
> real file.
>
> Vincent BARAT a écrit :
>>
>> Hello,
>>
>> In the process of to trying to add the support for HBase 0.20.0 in PIG
>> (trunk) I was trying the tutorial from PIG documentation:
>>
>> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
>>
>> Unfortunately, when I try:
>>
>> A = LOAD '27' USING RangeSlicer();
>> dump A;
>>
>> PIG reports the following error:
>>
>> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
>> ERROR 2081: Unable to setup the load function.
>>
>> If I provide an existing file, instead of '27', I no longer have this
>> error, but the output of the dump function is empty.
>>
>> Any idea ?
>>
>>
>> Here is my RangeSlicer() code:
>>
>> =========================================================
>>
>>
>> package com.ubikod.ermin.backend.pigudfs;
>>
>> import java.io.IOException;
>>
>> import org.apache.commons.logging.Log;
>> import org.apache.commons.logging.LogFactory;
>> import org.apache.pig.ExecType;
>> import org.apache.pig.LoadFunc;
>> import org.apache.pig.Slice;
>> import org.apache.pig.Slicer;
>> import org.apache.pig.backend.datastorage.DataStorage;
>> import org.apache.pig.builtin.Utf8StorageConverter;
>> import org.apache.pig.data.Tuple;
>> import org.apache.pig.impl.io.BufferedPositionedInputStream;
>> import org.apache.pig.impl.logicalLayer.schema.Schema;
>>
>> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
>>  LoadFunc
>> {
>>  private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
>>
>>  public RangeSlicer()
>>  {
>>    LOG.info("RangeSlicer");
>>  }
>>
>>  /**
>>   * Expects location to be a Stringified integer, and makes
>>   * Integer.parseInt(location) slices. Each slice generates a single
>> value, its
>>   * index in the sequence of slices.
>>   */
>>  public Slice[] slice(DataStorage store, String location) throws
>> IOException
>>  {
>>    LOG.info("slice #################" + location);
>>    location = "30";
>>    // Note: validate has already made sure that location is an integer
>>    int numslices = Integer.parseInt(location);
>>    LOG.info("slice #################" + numslices);
>>    Slice[] slices = new Slice[numslices];
>>    for (int i = 0; i < slices.length; i++)
>>    {
>>      slices[i] = new SingleValueSlice(i);
>>    }
>>    return slices;
>>  }
>>
>>  public void validate(DataStorage store, String location) throws
>> IOException
>>  {
>>    try
>>    {
>>      LOG.info("validate #################" + location);
>>      Integer.parseInt("30");
>>      LOG.info("validate #################" + location);
>>    }
>>    catch (NumberFormatException nfe)
>>    {
>>      throw new IOException(nfe.getMessage());
>>    }
>>  }
>>
>>  /**
>>   * A Slice that returns a single value from next.
>>   */
>>  public static class SingleValueSlice implements Slice
>>  {
>>    // note this value is set by the Slicer and will get serialized and
>>    // deserialized at the remote processing node
>>    public int val;
>>    // since we just have a single value, we can use a boolean rather than
>> a
>>    // counter
>>    private transient boolean read;
>>
>>    public SingleValueSlice(int value)
>>    {
>>      LOG.info("SingleValueSlice #################" + value);
>>
>>      this.val = value;
>>    }
>>
>>    public void close() throws IOException
>>    {
>>    }
>>
>>    public long getLength()
>>    {
>>      return 1;
>>    }
>>
>>    public String[] getLocations()
>>    {
>>      return new String[0];
>>    }
>>
>>    public long getStart()
>>    {
>>      return 0;
>>    }
>>
>>    public long getPos() throws IOException
>>    {
>>      return read ? 1 : 0;
>>    }
>>
>>    public float getProgress() throws IOException
>>    {
>>      return read ? 1 : 0;
>>    }
>>
>>    public void init(DataStorage store) throws IOException
>>    {
>>    }
>>
>>    public boolean next(Tuple value) throws IOException
>>    {
>>      if (!read)
>>      {
>>        LOG.info("next #################" + value);
>>
>>        value.append(val);
>>        read = true;
>>        return true;
>>      }
>>      return false;
>>    }
>>
>>    private static final long serialVersionUID = 1L;
>>  }
>>
>>  @Override
>>  public void bindTo(String arg0, BufferedPositionedInputStream arg1,
>>    long arg2, long arg3) throws IOException
>>  {
>>    LOG.info("bindTo #################" + arg0);
>>  }
>>
>>  @Override
>>  public Schema determineSchema(String arg0, ExecType arg1, DataStorage
>> arg2)
>>    throws IOException
>>  {
>>    // TODO Auto-generated method stub
>>    return null;
>>  }
>>
>>  @Override
>>  public void fieldsToRead(Schema arg0)
>>  {
>>    // TODO Auto-generated method stub
>>  }
>>
>>  @Override
>>  public Tuple getNext() throws IOException
>>  {
>>    // TODO Auto-generated method stub
>>    return null;
>>  }
>> }
>

Re: Issue with LoadFunc & Slicer

Posted by Vincent BARAT <vi...@ubikod.com>.
It seems that I got my answer: custom loader functions can only be 
used in map reduce mode, not local mode: in local mode, the file 
specified must be a real file.

Vincent BARAT a écrit :
> Hello,
> 
> In the process of to trying to add the support for HBase 0.20.0 in PIG 
> (trunk) I was trying the tutorial from PIG documentation:
> 
> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
> 
> Unfortunately, when I try:
> 
> A = LOAD '27' USING RangeSlicer();
> dump A;
> 
> PIG reports the following error:
> 
> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt - 
> ERROR 2081: Unable to setup the load function.
> 
> If I provide an existing file, instead of '27', I no longer have this 
> error, but the output of the dump function is empty.
> 
> Any idea ?
> 
> 
> Here is my RangeSlicer() code:
> 
> =========================================================
> 
> 
> package com.ubikod.ermin.backend.pigudfs;
> 
> import java.io.IOException;
> 
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.pig.ExecType;
> import org.apache.pig.LoadFunc;
> import org.apache.pig.Slice;
> import org.apache.pig.Slicer;
> import org.apache.pig.backend.datastorage.DataStorage;
> import org.apache.pig.builtin.Utf8StorageConverter;
> import org.apache.pig.data.Tuple;
> import org.apache.pig.impl.io.BufferedPositionedInputStream;
> import org.apache.pig.impl.logicalLayer.schema.Schema;
> 
> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
>   LoadFunc
> {
>   private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
> 
>   public RangeSlicer()
>   {
>     LOG.info("RangeSlicer");
>   }
> 
>   /**
>    * Expects location to be a Stringified integer, and makes
>    * Integer.parseInt(location) slices. Each slice generates a single 
> value, its
>    * index in the sequence of slices.
>    */
>   public Slice[] slice(DataStorage store, String location) throws 
> IOException
>   {
>     LOG.info("slice #################" + location);
>     location = "30";
>     // Note: validate has already made sure that location is an integer
>     int numslices = Integer.parseInt(location);
>     LOG.info("slice #################" + numslices);
>     Slice[] slices = new Slice[numslices];
>     for (int i = 0; i < slices.length; i++)
>     {
>       slices[i] = new SingleValueSlice(i);
>     }
>     return slices;
>   }
> 
>   public void validate(DataStorage store, String location) throws 
> IOException
>   {
>     try
>     {
>       LOG.info("validate #################" + location);
>       Integer.parseInt("30");
>       LOG.info("validate #################" + location);
>     }
>     catch (NumberFormatException nfe)
>     {
>       throw new IOException(nfe.getMessage());
>     }
>   }
> 
>   /**
>    * A Slice that returns a single value from next.
>    */
>   public static class SingleValueSlice implements Slice
>   {
>     // note this value is set by the Slicer and will get serialized and
>     // deserialized at the remote processing node
>     public int val;
>     // since we just have a single value, we can use a boolean rather 
> than a
>     // counter
>     private transient boolean read;
> 
>     public SingleValueSlice(int value)
>     {
>       LOG.info("SingleValueSlice #################" + value);
> 
>       this.val = value;
>     }
> 
>     public void close() throws IOException
>     {
>     }
> 
>     public long getLength()
>     {
>       return 1;
>     }
> 
>     public String[] getLocations()
>     {
>       return new String[0];
>     }
> 
>     public long getStart()
>     {
>       return 0;
>     }
> 
>     public long getPos() throws IOException
>     {
>       return read ? 1 : 0;
>     }
> 
>     public float getProgress() throws IOException
>     {
>       return read ? 1 : 0;
>     }
> 
>     public void init(DataStorage store) throws IOException
>     {
>     }
> 
>     public boolean next(Tuple value) throws IOException
>     {
>       if (!read)
>       {
>         LOG.info("next #################" + value);
> 
>         value.append(val);
>         read = true;
>         return true;
>       }
>       return false;
>     }
> 
>     private static final long serialVersionUID = 1L;
>   }
> 
>   @Override
>   public void bindTo(String arg0, BufferedPositionedInputStream arg1,
>     long arg2, long arg3) throws IOException
>   {
>     LOG.info("bindTo #################" + arg0);
>   }
> 
>   @Override
>   public Schema determineSchema(String arg0, ExecType arg1, DataStorage 
> arg2)
>     throws IOException
>   {
>     // TODO Auto-generated method stub
>     return null;
>   }
> 
>   @Override
>   public void fieldsToRead(Schema arg0)
>   {
>     // TODO Auto-generated method stub
>   }
> 
>   @Override
>   public Tuple getNext() throws IOException
>   {
>     // TODO Auto-generated method stub
>     return null;
>   }
> }