You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Vincent BARAT <vi...@ubikod.com> on 2009/09/14 15:38:23 UTC
Issue with LoadFunc & Slicer
Hello,
In the process of to trying to add the support for HBase 0.20.0 in
PIG (trunk) I was trying the tutorial from PIG documentation:
http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
Unfortunately, when I try:
A = LOAD '27' USING RangeSlicer();
dump A;
PIG reports the following error:
2009-09-14 15:33:46,395 [main] ERROR
org.apache.pig.tools.grunt.Grunt - ERROR 2081: Unable to setup the
load function.
If I provide an existing file, instead of '27', I no longer have
this error, but the output of the dump function is empty.
Any idea ?
Here is my RangeSlicer() code:
=========================================================
package com.ubikod.ermin.backend.pigudfs;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class RangeSlicer extends Utf8StorageConverter implements Slicer,
LoadFunc
{
private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
public RangeSlicer()
{
LOG.info("RangeSlicer");
}
/**
* Expects location to be a Stringified integer, and makes
* Integer.parseInt(location) slices. Each slice generates a
single value, its
* index in the sequence of slices.
*/
public Slice[] slice(DataStorage store, String location) throws
IOException
{
LOG.info("slice #################" + location);
location = "30";
// Note: validate has already made sure that location is an integer
int numslices = Integer.parseInt(location);
LOG.info("slice #################" + numslices);
Slice[] slices = new Slice[numslices];
for (int i = 0; i < slices.length; i++)
{
slices[i] = new SingleValueSlice(i);
}
return slices;
}
public void validate(DataStorage store, String location) throws
IOException
{
try
{
LOG.info("validate #################" + location);
Integer.parseInt("30");
LOG.info("validate #################" + location);
}
catch (NumberFormatException nfe)
{
throw new IOException(nfe.getMessage());
}
}
/**
* A Slice that returns a single value from next.
*/
public static class SingleValueSlice implements Slice
{
// note this value is set by the Slicer and will get serialized and
// deserialized at the remote processing node
public int val;
// since we just have a single value, we can use a boolean
rather than a
// counter
private transient boolean read;
public SingleValueSlice(int value)
{
LOG.info("SingleValueSlice #################" + value);
this.val = value;
}
public void close() throws IOException
{
}
public long getLength()
{
return 1;
}
public String[] getLocations()
{
return new String[0];
}
public long getStart()
{
return 0;
}
public long getPos() throws IOException
{
return read ? 1 : 0;
}
public float getProgress() throws IOException
{
return read ? 1 : 0;
}
public void init(DataStorage store) throws IOException
{
}
public boolean next(Tuple value) throws IOException
{
if (!read)
{
LOG.info("next #################" + value);
value.append(val);
read = true;
return true;
}
return false;
}
private static final long serialVersionUID = 1L;
}
@Override
public void bindTo(String arg0, BufferedPositionedInputStream arg1,
long arg2, long arg3) throws IOException
{
LOG.info("bindTo #################" + arg0);
}
@Override
public Schema determineSchema(String arg0, ExecType arg1,
DataStorage arg2)
throws IOException
{
// TODO Auto-generated method stub
return null;
}
@Override
public void fieldsToRead(Schema arg0)
{
// TODO Auto-generated method stub
}
@Override
public Tuple getNext() throws IOException
{
// TODO Auto-generated method stub
return null;
}
}
Re: Issue with LoadFunc & Slicer
Posted by Alan Gates <ga...@yahoo-inc.com>.
Kevin,
Please take a look at the proposal for reworking load and store
functions that was posted a couple of days ago and see if it will
address your issues with plugability of load functions.
http://wiki.apache.org/pig/LoadStoreRedesignProposal
Alan.
On Sep 14, 2009, at 8:58 AM, Kevin Weil wrote:
> +1 on this. I'm writing a bunch of LZO-based LoadFuncs/Slicers
> (commit
> coming soon) and it's so much faster to test/verify in local mode.
>
> Btw, I believe there's already a ticket somewhere for this, but
> while I'm at
> it: +1 on further separation of the LoadFunc/pig output from the
> Slicer/disk
> reads. Right now there are some one-offs in the Pig code to deal with
> gzipped file input. Writing loaders to deal with Lzo (with indexed
> blocks
> that mean shifting the InputSplits slightly) has meant copying a
> bunch of
> the logic for parsing tuples from PigStorage, etc, because the lower
> layer
> isn't as pluggable as it could be.
>
> Kevin
>
> On Mon, Sep 14, 2009 at 8:35 AM, Dmitriy Ryaboy <dv...@gmail.com>
> wrote:
>
>> There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612
>>
>> Vote it up so that the pig developers have a record of user interest
>> in this feature.
>>
>> -D
>>
>> On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
>> <vi...@ubikod.com> wrote:
>>> It seems that I got my answer: custom loader functions can only be
>>> used
>> in
>>> map reduce mode, not local mode: in local mode, the file specified
>>> must
>> be a
>>> real file.
>>>
>>> Vincent BARAT a écrit :
>>>>
>>>> Hello,
>>>>
>>>> In the process of to trying to add the support for HBase 0.20.0
>>>> in PIG
>>>> (trunk) I was trying the tutorial from PIG documentation:
>>>>
>>>> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
>>>>
>>>> Unfortunately, when I try:
>>>>
>>>> A = LOAD '27' USING RangeSlicer();
>>>> dump A;
>>>>
>>>> PIG reports the following error:
>>>>
>>>> 2009-09-14 15:33:46,395 [main] ERROR
>>>> org.apache.pig.tools.grunt.Grunt -
>>>> ERROR 2081: Unable to setup the load function.
>>>>
>>>> If I provide an existing file, instead of '27', I no longer have
>>>> this
>>>> error, but the output of the dump function is empty.
>>>>
>>>> Any idea ?
>>>>
>>>>
>>>> Here is my RangeSlicer() code:
>>>>
>>>> =========================================================
>>>>
>>>>
>>>> package com.ubikod.ermin.backend.pigudfs;
>>>>
>>>> import java.io.IOException;
>>>>
>>>> import org.apache.commons.logging.Log;
>>>> import org.apache.commons.logging.LogFactory;
>>>> import org.apache.pig.ExecType;
>>>> import org.apache.pig.LoadFunc;
>>>> import org.apache.pig.Slice;
>>>> import org.apache.pig.Slicer;
>>>> import org.apache.pig.backend.datastorage.DataStorage;
>>>> import org.apache.pig.builtin.Utf8StorageConverter;
>>>> import org.apache.pig.data.Tuple;
>>>> import org.apache.pig.impl.io.BufferedPositionedInputStream;
>>>> import org.apache.pig.impl.logicalLayer.schema.Schema;
>>>>
>>>> public class RangeSlicer extends Utf8StorageConverter implements
>>>> Slicer,
>>>> LoadFunc
>>>> {
>>>> private static final Log LOG =
>>>> LogFactory.getLog(RangeSlicer.class);
>>>>
>>>> public RangeSlicer()
>>>> {
>>>> LOG.info("RangeSlicer");
>>>> }
>>>>
>>>> /**
>>>> * Expects location to be a Stringified integer, and makes
>>>> * Integer.parseInt(location) slices. Each slice generates a single
>>>> value, its
>>>> * index in the sequence of slices.
>>>> */
>>>> public Slice[] slice(DataStorage store, String location) throws
>>>> IOException
>>>> {
>>>> LOG.info("slice #################" + location);
>>>> location = "30";
>>>> // Note: validate has already made sure that location is an
>>>> integer
>>>> int numslices = Integer.parseInt(location);
>>>> LOG.info("slice #################" + numslices);
>>>> Slice[] slices = new Slice[numslices];
>>>> for (int i = 0; i < slices.length; i++)
>>>> {
>>>> slices[i] = new SingleValueSlice(i);
>>>> }
>>>> return slices;
>>>> }
>>>>
>>>> public void validate(DataStorage store, String location) throws
>>>> IOException
>>>> {
>>>> try
>>>> {
>>>> LOG.info("validate #################" + location);
>>>> Integer.parseInt("30");
>>>> LOG.info("validate #################" + location);
>>>> }
>>>> catch (NumberFormatException nfe)
>>>> {
>>>> throw new IOException(nfe.getMessage());
>>>> }
>>>> }
>>>>
>>>> /**
>>>> * A Slice that returns a single value from next.
>>>> */
>>>> public static class SingleValueSlice implements Slice
>>>> {
>>>> // note this value is set by the Slicer and will get serialized
>>>> and
>>>> // deserialized at the remote processing node
>>>> public int val;
>>>> // since we just have a single value, we can use a boolean rather
>> than
>>>> a
>>>> // counter
>>>> private transient boolean read;
>>>>
>>>> public SingleValueSlice(int value)
>>>> {
>>>> LOG.info("SingleValueSlice #################" + value);
>>>>
>>>> this.val = value;
>>>> }
>>>>
>>>> public void close() throws IOException
>>>> {
>>>> }
>>>>
>>>> public long getLength()
>>>> {
>>>> return 1;
>>>> }
>>>>
>>>> public String[] getLocations()
>>>> {
>>>> return new String[0];
>>>> }
>>>>
>>>> public long getStart()
>>>> {
>>>> return 0;
>>>> }
>>>>
>>>> public long getPos() throws IOException
>>>> {
>>>> return read ? 1 : 0;
>>>> }
>>>>
>>>> public float getProgress() throws IOException
>>>> {
>>>> return read ? 1 : 0;
>>>> }
>>>>
>>>> public void init(DataStorage store) throws IOException
>>>> {
>>>> }
>>>>
>>>> public boolean next(Tuple value) throws IOException
>>>> {
>>>> if (!read)
>>>> {
>>>> LOG.info("next #################" + value);
>>>>
>>>> value.append(val);
>>>> read = true;
>>>> return true;
>>>> }
>>>> return false;
>>>> }
>>>>
>>>> private static final long serialVersionUID = 1L;
>>>> }
>>>>
>>>> @Override
>>>> public void bindTo(String arg0, BufferedPositionedInputStream arg1,
>>>> long arg2, long arg3) throws IOException
>>>> {
>>>> LOG.info("bindTo #################" + arg0);
>>>> }
>>>>
>>>> @Override
>>>> public Schema determineSchema(String arg0, ExecType arg1,
>>>> DataStorage
>>>> arg2)
>>>> throws IOException
>>>> {
>>>> // TODO Auto-generated method stub
>>>> return null;
>>>> }
>>>>
>>>> @Override
>>>> public void fieldsToRead(Schema arg0)
>>>> {
>>>> // TODO Auto-generated method stub
>>>> }
>>>>
>>>> @Override
>>>> public Tuple getNext() throws IOException
>>>> {
>>>> // TODO Auto-generated method stub
>>>> return null;
>>>> }
>>>> }
>>>
>>
Re: Issue with LoadFunc & Slicer
Posted by Kevin Weil <ke...@gmail.com>.
+1 on this. I'm writing a bunch of LZO-based LoadFuncs/Slicers (commit
coming soon) and it's so much faster to test/verify in local mode.
Btw, I believe there's already a ticket somewhere for this, but while I'm at
it: +1 on further separation of the LoadFunc/pig output from the Slicer/disk
reads. Right now there are some one-offs in the Pig code to deal with
gzipped file input. Writing loaders to deal with Lzo (with indexed blocks
that mean shifting the InputSplits slightly) has meant copying a bunch of
the logic for parsing tuples from PigStorage, etc, because the lower layer
isn't as pluggable as it could be.
Kevin
On Mon, Sep 14, 2009 at 8:35 AM, Dmitriy Ryaboy <dv...@gmail.com> wrote:
> There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612
>
> Vote it up so that the pig developers have a record of user interest
> in this feature.
>
> -D
>
> On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
> <vi...@ubikod.com> wrote:
> > It seems that I got my answer: custom loader functions can only be used
> in
> > map reduce mode, not local mode: in local mode, the file specified must
> be a
> > real file.
> >
> > Vincent BARAT a écrit :
> >>
> >> Hello,
> >>
> >> In the process of to trying to add the support for HBase 0.20.0 in PIG
> >> (trunk) I was trying the tutorial from PIG documentation:
> >>
> >> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
> >>
> >> Unfortunately, when I try:
> >>
> >> A = LOAD '27' USING RangeSlicer();
> >> dump A;
> >>
> >> PIG reports the following error:
> >>
> >> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
> >> ERROR 2081: Unable to setup the load function.
> >>
> >> If I provide an existing file, instead of '27', I no longer have this
> >> error, but the output of the dump function is empty.
> >>
> >> Any idea ?
> >>
> >>
> >> Here is my RangeSlicer() code:
> >>
> >> =========================================================
> >>
> >>
> >> package com.ubikod.ermin.backend.pigudfs;
> >>
> >> import java.io.IOException;
> >>
> >> import org.apache.commons.logging.Log;
> >> import org.apache.commons.logging.LogFactory;
> >> import org.apache.pig.ExecType;
> >> import org.apache.pig.LoadFunc;
> >> import org.apache.pig.Slice;
> >> import org.apache.pig.Slicer;
> >> import org.apache.pig.backend.datastorage.DataStorage;
> >> import org.apache.pig.builtin.Utf8StorageConverter;
> >> import org.apache.pig.data.Tuple;
> >> import org.apache.pig.impl.io.BufferedPositionedInputStream;
> >> import org.apache.pig.impl.logicalLayer.schema.Schema;
> >>
> >> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
> >> LoadFunc
> >> {
> >> private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
> >>
> >> public RangeSlicer()
> >> {
> >> LOG.info("RangeSlicer");
> >> }
> >>
> >> /**
> >> * Expects location to be a Stringified integer, and makes
> >> * Integer.parseInt(location) slices. Each slice generates a single
> >> value, its
> >> * index in the sequence of slices.
> >> */
> >> public Slice[] slice(DataStorage store, String location) throws
> >> IOException
> >> {
> >> LOG.info("slice #################" + location);
> >> location = "30";
> >> // Note: validate has already made sure that location is an integer
> >> int numslices = Integer.parseInt(location);
> >> LOG.info("slice #################" + numslices);
> >> Slice[] slices = new Slice[numslices];
> >> for (int i = 0; i < slices.length; i++)
> >> {
> >> slices[i] = new SingleValueSlice(i);
> >> }
> >> return slices;
> >> }
> >>
> >> public void validate(DataStorage store, String location) throws
> >> IOException
> >> {
> >> try
> >> {
> >> LOG.info("validate #################" + location);
> >> Integer.parseInt("30");
> >> LOG.info("validate #################" + location);
> >> }
> >> catch (NumberFormatException nfe)
> >> {
> >> throw new IOException(nfe.getMessage());
> >> }
> >> }
> >>
> >> /**
> >> * A Slice that returns a single value from next.
> >> */
> >> public static class SingleValueSlice implements Slice
> >> {
> >> // note this value is set by the Slicer and will get serialized and
> >> // deserialized at the remote processing node
> >> public int val;
> >> // since we just have a single value, we can use a boolean rather
> than
> >> a
> >> // counter
> >> private transient boolean read;
> >>
> >> public SingleValueSlice(int value)
> >> {
> >> LOG.info("SingleValueSlice #################" + value);
> >>
> >> this.val = value;
> >> }
> >>
> >> public void close() throws IOException
> >> {
> >> }
> >>
> >> public long getLength()
> >> {
> >> return 1;
> >> }
> >>
> >> public String[] getLocations()
> >> {
> >> return new String[0];
> >> }
> >>
> >> public long getStart()
> >> {
> >> return 0;
> >> }
> >>
> >> public long getPos() throws IOException
> >> {
> >> return read ? 1 : 0;
> >> }
> >>
> >> public float getProgress() throws IOException
> >> {
> >> return read ? 1 : 0;
> >> }
> >>
> >> public void init(DataStorage store) throws IOException
> >> {
> >> }
> >>
> >> public boolean next(Tuple value) throws IOException
> >> {
> >> if (!read)
> >> {
> >> LOG.info("next #################" + value);
> >>
> >> value.append(val);
> >> read = true;
> >> return true;
> >> }
> >> return false;
> >> }
> >>
> >> private static final long serialVersionUID = 1L;
> >> }
> >>
> >> @Override
> >> public void bindTo(String arg0, BufferedPositionedInputStream arg1,
> >> long arg2, long arg3) throws IOException
> >> {
> >> LOG.info("bindTo #################" + arg0);
> >> }
> >>
> >> @Override
> >> public Schema determineSchema(String arg0, ExecType arg1, DataStorage
> >> arg2)
> >> throws IOException
> >> {
> >> // TODO Auto-generated method stub
> >> return null;
> >> }
> >>
> >> @Override
> >> public void fieldsToRead(Schema arg0)
> >> {
> >> // TODO Auto-generated method stub
> >> }
> >>
> >> @Override
> >> public Tuple getNext() throws IOException
> >> {
> >> // TODO Auto-generated method stub
> >> return null;
> >> }
> >> }
> >
>
Re: Issue with LoadFunc & Slicer
Posted by Vincent BARAT <vi...@ubikod.com>.
Hummm... Or the captcha is buggy, or I'm getting blind: I cannot
manage to signup to your JIRA !
Dmitriy Ryaboy a écrit :
> There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612
>
> Vote it up so that the pig developers have a record of user interest
> in this feature.
>
> -D
>
> On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
> <vi...@ubikod.com> wrote:
>> It seems that I got my answer: custom loader functions can only be used in
>> map reduce mode, not local mode: in local mode, the file specified must be a
>> real file.
>>
>> Vincent BARAT a écrit :
>>> Hello,
>>>
>>> In the process of to trying to add the support for HBase 0.20.0 in PIG
>>> (trunk) I was trying the tutorial from PIG documentation:
>>>
>>> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
>>>
>>> Unfortunately, when I try:
>>>
>>> A = LOAD '27' USING RangeSlicer();
>>> dump A;
>>>
>>> PIG reports the following error:
>>>
>>> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
>>> ERROR 2081: Unable to setup the load function.
>>>
>>> If I provide an existing file, instead of '27', I no longer have this
>>> error, but the output of the dump function is empty.
>>>
>>> Any idea ?
>>>
>>>
>>> Here is my RangeSlicer() code:
>>>
>>> =========================================================
>>>
>>>
>>> package com.ubikod.ermin.backend.pigudfs;
>>>
>>> import java.io.IOException;
>>>
>>> import org.apache.commons.logging.Log;
>>> import org.apache.commons.logging.LogFactory;
>>> import org.apache.pig.ExecType;
>>> import org.apache.pig.LoadFunc;
>>> import org.apache.pig.Slice;
>>> import org.apache.pig.Slicer;
>>> import org.apache.pig.backend.datastorage.DataStorage;
>>> import org.apache.pig.builtin.Utf8StorageConverter;
>>> import org.apache.pig.data.Tuple;
>>> import org.apache.pig.impl.io.BufferedPositionedInputStream;
>>> import org.apache.pig.impl.logicalLayer.schema.Schema;
>>>
>>> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
>>> LoadFunc
>>> {
>>> private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
>>>
>>> public RangeSlicer()
>>> {
>>> LOG.info("RangeSlicer");
>>> }
>>>
>>> /**
>>> * Expects location to be a Stringified integer, and makes
>>> * Integer.parseInt(location) slices. Each slice generates a single
>>> value, its
>>> * index in the sequence of slices.
>>> */
>>> public Slice[] slice(DataStorage store, String location) throws
>>> IOException
>>> {
>>> LOG.info("slice #################" + location);
>>> location = "30";
>>> // Note: validate has already made sure that location is an integer
>>> int numslices = Integer.parseInt(location);
>>> LOG.info("slice #################" + numslices);
>>> Slice[] slices = new Slice[numslices];
>>> for (int i = 0; i < slices.length; i++)
>>> {
>>> slices[i] = new SingleValueSlice(i);
>>> }
>>> return slices;
>>> }
>>>
>>> public void validate(DataStorage store, String location) throws
>>> IOException
>>> {
>>> try
>>> {
>>> LOG.info("validate #################" + location);
>>> Integer.parseInt("30");
>>> LOG.info("validate #################" + location);
>>> }
>>> catch (NumberFormatException nfe)
>>> {
>>> throw new IOException(nfe.getMessage());
>>> }
>>> }
>>>
>>> /**
>>> * A Slice that returns a single value from next.
>>> */
>>> public static class SingleValueSlice implements Slice
>>> {
>>> // note this value is set by the Slicer and will get serialized and
>>> // deserialized at the remote processing node
>>> public int val;
>>> // since we just have a single value, we can use a boolean rather than
>>> a
>>> // counter
>>> private transient boolean read;
>>>
>>> public SingleValueSlice(int value)
>>> {
>>> LOG.info("SingleValueSlice #################" + value);
>>>
>>> this.val = value;
>>> }
>>>
>>> public void close() throws IOException
>>> {
>>> }
>>>
>>> public long getLength()
>>> {
>>> return 1;
>>> }
>>>
>>> public String[] getLocations()
>>> {
>>> return new String[0];
>>> }
>>>
>>> public long getStart()
>>> {
>>> return 0;
>>> }
>>>
>>> public long getPos() throws IOException
>>> {
>>> return read ? 1 : 0;
>>> }
>>>
>>> public float getProgress() throws IOException
>>> {
>>> return read ? 1 : 0;
>>> }
>>>
>>> public void init(DataStorage store) throws IOException
>>> {
>>> }
>>>
>>> public boolean next(Tuple value) throws IOException
>>> {
>>> if (!read)
>>> {
>>> LOG.info("next #################" + value);
>>>
>>> value.append(val);
>>> read = true;
>>> return true;
>>> }
>>> return false;
>>> }
>>>
>>> private static final long serialVersionUID = 1L;
>>> }
>>>
>>> @Override
>>> public void bindTo(String arg0, BufferedPositionedInputStream arg1,
>>> long arg2, long arg3) throws IOException
>>> {
>>> LOG.info("bindTo #################" + arg0);
>>> }
>>>
>>> @Override
>>> public Schema determineSchema(String arg0, ExecType arg1, DataStorage
>>> arg2)
>>> throws IOException
>>> {
>>> // TODO Auto-generated method stub
>>> return null;
>>> }
>>>
>>> @Override
>>> public void fieldsToRead(Schema arg0)
>>> {
>>> // TODO Auto-generated method stub
>>> }
>>>
>>> @Override
>>> public Tuple getNext() throws IOException
>>> {
>>> // TODO Auto-generated method stub
>>> return null;
>>> }
>>> }
>
>
Re: Issue with LoadFunc & Slicer
Posted by Dmitriy Ryaboy <dv...@gmail.com>.
There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612
Vote it up so that the pig developers have a record of user interest
in this feature.
-D
On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
<vi...@ubikod.com> wrote:
> It seems that I got my answer: custom loader functions can only be used in
> map reduce mode, not local mode: in local mode, the file specified must be a
> real file.
>
> Vincent BARAT a écrit :
>>
>> Hello,
>>
>> In the process of to trying to add the support for HBase 0.20.0 in PIG
>> (trunk) I was trying the tutorial from PIG documentation:
>>
>> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
>>
>> Unfortunately, when I try:
>>
>> A = LOAD '27' USING RangeSlicer();
>> dump A;
>>
>> PIG reports the following error:
>>
>> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
>> ERROR 2081: Unable to setup the load function.
>>
>> If I provide an existing file, instead of '27', I no longer have this
>> error, but the output of the dump function is empty.
>>
>> Any idea ?
>>
>>
>> Here is my RangeSlicer() code:
>>
>> =========================================================
>>
>>
>> package com.ubikod.ermin.backend.pigudfs;
>>
>> import java.io.IOException;
>>
>> import org.apache.commons.logging.Log;
>> import org.apache.commons.logging.LogFactory;
>> import org.apache.pig.ExecType;
>> import org.apache.pig.LoadFunc;
>> import org.apache.pig.Slice;
>> import org.apache.pig.Slicer;
>> import org.apache.pig.backend.datastorage.DataStorage;
>> import org.apache.pig.builtin.Utf8StorageConverter;
>> import org.apache.pig.data.Tuple;
>> import org.apache.pig.impl.io.BufferedPositionedInputStream;
>> import org.apache.pig.impl.logicalLayer.schema.Schema;
>>
>> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
>> LoadFunc
>> {
>> private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
>>
>> public RangeSlicer()
>> {
>> LOG.info("RangeSlicer");
>> }
>>
>> /**
>> * Expects location to be a Stringified integer, and makes
>> * Integer.parseInt(location) slices. Each slice generates a single
>> value, its
>> * index in the sequence of slices.
>> */
>> public Slice[] slice(DataStorage store, String location) throws
>> IOException
>> {
>> LOG.info("slice #################" + location);
>> location = "30";
>> // Note: validate has already made sure that location is an integer
>> int numslices = Integer.parseInt(location);
>> LOG.info("slice #################" + numslices);
>> Slice[] slices = new Slice[numslices];
>> for (int i = 0; i < slices.length; i++)
>> {
>> slices[i] = new SingleValueSlice(i);
>> }
>> return slices;
>> }
>>
>> public void validate(DataStorage store, String location) throws
>> IOException
>> {
>> try
>> {
>> LOG.info("validate #################" + location);
>> Integer.parseInt("30");
>> LOG.info("validate #################" + location);
>> }
>> catch (NumberFormatException nfe)
>> {
>> throw new IOException(nfe.getMessage());
>> }
>> }
>>
>> /**
>> * A Slice that returns a single value from next.
>> */
>> public static class SingleValueSlice implements Slice
>> {
>> // note this value is set by the Slicer and will get serialized and
>> // deserialized at the remote processing node
>> public int val;
>> // since we just have a single value, we can use a boolean rather than
>> a
>> // counter
>> private transient boolean read;
>>
>> public SingleValueSlice(int value)
>> {
>> LOG.info("SingleValueSlice #################" + value);
>>
>> this.val = value;
>> }
>>
>> public void close() throws IOException
>> {
>> }
>>
>> public long getLength()
>> {
>> return 1;
>> }
>>
>> public String[] getLocations()
>> {
>> return new String[0];
>> }
>>
>> public long getStart()
>> {
>> return 0;
>> }
>>
>> public long getPos() throws IOException
>> {
>> return read ? 1 : 0;
>> }
>>
>> public float getProgress() throws IOException
>> {
>> return read ? 1 : 0;
>> }
>>
>> public void init(DataStorage store) throws IOException
>> {
>> }
>>
>> public boolean next(Tuple value) throws IOException
>> {
>> if (!read)
>> {
>> LOG.info("next #################" + value);
>>
>> value.append(val);
>> read = true;
>> return true;
>> }
>> return false;
>> }
>>
>> private static final long serialVersionUID = 1L;
>> }
>>
>> @Override
>> public void bindTo(String arg0, BufferedPositionedInputStream arg1,
>> long arg2, long arg3) throws IOException
>> {
>> LOG.info("bindTo #################" + arg0);
>> }
>>
>> @Override
>> public Schema determineSchema(String arg0, ExecType arg1, DataStorage
>> arg2)
>> throws IOException
>> {
>> // TODO Auto-generated method stub
>> return null;
>> }
>>
>> @Override
>> public void fieldsToRead(Schema arg0)
>> {
>> // TODO Auto-generated method stub
>> }
>>
>> @Override
>> public Tuple getNext() throws IOException
>> {
>> // TODO Auto-generated method stub
>> return null;
>> }
>> }
>
Re: Issue with LoadFunc & Slicer
Posted by Vincent BARAT <vi...@ubikod.com>.
It seems that I got my answer: custom loader functions can only be
used in map reduce mode, not local mode: in local mode, the file
specified must be a real file.
Vincent BARAT a écrit :
> Hello,
>
> In the process of to trying to add the support for HBase 0.20.0 in PIG
> (trunk) I was trying the tutorial from PIG documentation:
>
> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
>
> Unfortunately, when I try:
>
> A = LOAD '27' USING RangeSlicer();
> dump A;
>
> PIG reports the following error:
>
> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
> ERROR 2081: Unable to setup the load function.
>
> If I provide an existing file, instead of '27', I no longer have this
> error, but the output of the dump function is empty.
>
> Any idea ?
>
>
> Here is my RangeSlicer() code:
>
> =========================================================
>
>
> package com.ubikod.ermin.backend.pigudfs;
>
> import java.io.IOException;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.pig.ExecType;
> import org.apache.pig.LoadFunc;
> import org.apache.pig.Slice;
> import org.apache.pig.Slicer;
> import org.apache.pig.backend.datastorage.DataStorage;
> import org.apache.pig.builtin.Utf8StorageConverter;
> import org.apache.pig.data.Tuple;
> import org.apache.pig.impl.io.BufferedPositionedInputStream;
> import org.apache.pig.impl.logicalLayer.schema.Schema;
>
> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
> LoadFunc
> {
> private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
>
> public RangeSlicer()
> {
> LOG.info("RangeSlicer");
> }
>
> /**
> * Expects location to be a Stringified integer, and makes
> * Integer.parseInt(location) slices. Each slice generates a single
> value, its
> * index in the sequence of slices.
> */
> public Slice[] slice(DataStorage store, String location) throws
> IOException
> {
> LOG.info("slice #################" + location);
> location = "30";
> // Note: validate has already made sure that location is an integer
> int numslices = Integer.parseInt(location);
> LOG.info("slice #################" + numslices);
> Slice[] slices = new Slice[numslices];
> for (int i = 0; i < slices.length; i++)
> {
> slices[i] = new SingleValueSlice(i);
> }
> return slices;
> }
>
> public void validate(DataStorage store, String location) throws
> IOException
> {
> try
> {
> LOG.info("validate #################" + location);
> Integer.parseInt("30");
> LOG.info("validate #################" + location);
> }
> catch (NumberFormatException nfe)
> {
> throw new IOException(nfe.getMessage());
> }
> }
>
> /**
> * A Slice that returns a single value from next.
> */
> public static class SingleValueSlice implements Slice
> {
> // note this value is set by the Slicer and will get serialized and
> // deserialized at the remote processing node
> public int val;
> // since we just have a single value, we can use a boolean rather
> than a
> // counter
> private transient boolean read;
>
> public SingleValueSlice(int value)
> {
> LOG.info("SingleValueSlice #################" + value);
>
> this.val = value;
> }
>
> public void close() throws IOException
> {
> }
>
> public long getLength()
> {
> return 1;
> }
>
> public String[] getLocations()
> {
> return new String[0];
> }
>
> public long getStart()
> {
> return 0;
> }
>
> public long getPos() throws IOException
> {
> return read ? 1 : 0;
> }
>
> public float getProgress() throws IOException
> {
> return read ? 1 : 0;
> }
>
> public void init(DataStorage store) throws IOException
> {
> }
>
> public boolean next(Tuple value) throws IOException
> {
> if (!read)
> {
> LOG.info("next #################" + value);
>
> value.append(val);
> read = true;
> return true;
> }
> return false;
> }
>
> private static final long serialVersionUID = 1L;
> }
>
> @Override
> public void bindTo(String arg0, BufferedPositionedInputStream arg1,
> long arg2, long arg3) throws IOException
> {
> LOG.info("bindTo #################" + arg0);
> }
>
> @Override
> public Schema determineSchema(String arg0, ExecType arg1, DataStorage
> arg2)
> throws IOException
> {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public void fieldsToRead(Schema arg0)
> {
> // TODO Auto-generated method stub
> }
>
> @Override
> public Tuple getNext() throws IOException
> {
> // TODO Auto-generated method stub
> return null;
> }
> }