You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2015/08/07 11:28:05 UTC

Invalid argument reading a file containing a Kryo object

Hi to all,
I;m trying to read a file serialized with kryo but I get this exception
(due to the fact that the createInputSplits creates 8 inputsplits, where
just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at
org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at
org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at
org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser")))
{
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new
TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks a lot!

2015-08-10 12:20 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Done through https://issues.apache.org/jira/browse/FLINK-2503
>
> Thanks again,
> Flavio
>
> On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Congrats that you got your InputFormat working!
>> It is true, there can be a few inconsistencies in the Formats derived
>> from FileInputFormat.
>>
>> It would be great if you could open JIRAs for these issues. Otherwise,
>> the might get lost on the mailing list.
>>
>> Thanks, Fabian
>>
>> 2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi Fabian,
>>> thanks to your help I finally managed to successfully generate a DataSet
>>> from my folder but I think that there are some inconsistencies in the
>>> hierarchy of InputFormats.
>>> The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow
>>> inherit the behaviour of the FileInputFormat (so respect *unsplittable*
>>> and *enumerateNestedFiles*) while they doesn't take into account those
>>> flags.
>>> Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix
>>> this shit"* that maybe should be removed or fixed :)
>>>
>>> Also maintaing aligned testForUnsplittable and decorateInputStream is
>>> somehow dangerous..
>>> And maybe visibility for getBlockIndexForPosition should be changed to
>>> protected?
>>>
>>> So basically, my needs was to implement
>>> a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a
>>> lot of overrides..am I doing something wrong or are those inputFormat
>>> somehow to improve..? This is my IF code (*remark*: from the comment *"Copied
>>> from FileInputFormat (override TypeSerializerInputFormat)"* on the code
>>> is copied-and-pasted from FileInputFormat..thus MY code ends there):
>>>
>>> public class RowBundleInputFormat extends
>>> TypeSerializerInputFormat<RowBundle> {
>>>
>>> private static final long serialVersionUID = 1L;
>>> private static final Logger LOG =
>>> LoggerFactory.getLogger(RowBundleInputFormat.class);
>>>
>>> /** The fraction that the last split may be larger than the others. */
>>> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
>>> private boolean objectRead;
>>>
>>> public RowBundleInputFormat() {
>>> super(new GenericTypeInfo<>(RowBundle.class));
>>> unsplittable = true;
>>> }
>>>
>>> @Override
>>> protected FSDataInputStream decorateInputStream(FSDataInputStream
>>> inputStream, FileInputSplit fileSplit) throws Throwable {
>>> return inputStream;
>>> }
>>>
>>> @Override
>>> protected boolean testForUnsplittable(FileStatus pathFile) {
>>> return true;
>>> }
>>>
>>> @Override
>>> public void open(FileInputSplit split) throws IOException {
>>> super.open(split);
>>> objectRead = false;
>>> }
>>>
>>> @Override
>>> public boolean reachedEnd() throws IOException {
>>> return this.objectRead;
>>> }
>>>
>>> @Override
>>> public RowBundle nextRecord(RowBundle reuse) throws IOException {
>>> RowBundle yourObject = super.nextRecord(reuse);
>>> this.objectRead = true; // read only one object
>>> return yourObject;
>>> }
>>>
>>> // -------------------------------------------------------------------
>>> // Copied from FileInputFormat (override TypeSerializerInputFormat)
>>> // -------------------------------------------------------------------
>>> @Override
>>> public FileInputSplit[] createInputSplits(int minNumSplits)
>>> throws IOException {
>>> if (minNumSplits < 1) {
>>> throw new IllegalArgumentException(
>>> "Number of input splits has to be at least 1.");
>>> }
>>>
>>> // take the desired number of splits into account
>>> minNumSplits = Math.max(minNumSplits, this.numSplits);
>>>
>>> final Path path = this.filePath;
>>> final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
>>> minNumSplits);
>>>
>>> // get all the files that are involved in the splits
>>> List<FileStatus> files = new ArrayList<FileStatus>();
>>> long totalLength = 0;
>>>
>>> final FileSystem fs = path.getFileSystem();
>>> final FileStatus pathFile = fs.getFileStatus(path);
>>>
>>> if (pathFile.isDir()) {
>>> // input is directory. list all contained files
>>> final FileStatus[] dir = fs.listStatus(path);
>>> for (int i = 0; i < dir.length; i++) {
>>> if (dir[i].isDir()) {
>>> if (enumerateNestedFiles) {
>>> if (acceptFile(dir[i])) {
>>> totalLength += addNestedFiles(dir[i].getPath(),
>>> files, 0, true);
>>> } else {
>>> if (LOG.isDebugEnabled()) {
>>> LOG.debug("Directory "
>>> + dir[i].getPath().toString()
>>> + " did not pass the file-filter and is excluded.");
>>> }
>>> }
>>> }
>>> } else {
>>> if (acceptFile(dir[i])) {
>>> files.add(dir[i]);
>>> totalLength += dir[i].getLen();
>>> // as soon as there is one deflate file in a directory,
>>> // we can not split it
>>> testForUnsplittable(dir[i]);
>>> } else {
>>> if (LOG.isDebugEnabled()) {
>>> LOG.debug("File "
>>> + dir[i].getPath().toString()
>>> + " did not pass the file-filter and is excluded.");
>>> }
>>> }
>>> }
>>> }
>>> } else {
>>> testForUnsplittable(pathFile);
>>>
>>> files.add(pathFile);
>>> totalLength += pathFile.getLen();
>>> }
>>> // returns if unsplittable
>>> if (unsplittable) {
>>> int splitNum = 0;
>>> for (final FileStatus file : files) {
>>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>>> 0, file.getLen());
>>> Set<String> hosts = new HashSet<String>();
>>> for (BlockLocation block : blocks) {
>>> hosts.addAll(Arrays.asList(block.getHosts()));
>>> }
>>> long len = file.getLen();
>>> if (testForUnsplittable(file)) {
>>> len = READ_WHOLE_SPLIT_FLAG;
>>> }
>>> FileInputSplit fis = new FileInputSplit(splitNum++,
>>> file.getPath(), 0, len, hosts.toArray(new String[hosts
>>> .size()]));
>>> inputSplits.add(fis);
>>> }
>>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
>>> }
>>>
>>> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
>>> : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
>>> : 1));
>>>
>>> // now that we have the files, generate the splits
>>> int splitNum = 0;
>>> for (final FileStatus file : files) {
>>>
>>> final long len = file.getLen();
>>> final long blockSize = file.getBlockSize();
>>>
>>> final long minSplitSize;
>>> if (this.minSplitSize <= blockSize) {
>>> minSplitSize = this.minSplitSize;
>>> } else {
>>> if (LOG.isWarnEnabled()) {
>>> LOG.warn("Minimal split size of " + this.minSplitSize
>>> + " is larger than the block size of " + blockSize
>>> + ". Decreasing minimal split size to block size.");
>>> }
>>> minSplitSize = blockSize;
>>> }
>>>
>>> final long splitSize = Math.max(minSplitSize,
>>> Math.min(maxSplitSize, blockSize));
>>> final long halfSplit = splitSize >>> 1;
>>>
>>> final long maxBytesForLastSplit = (long) (splitSize *
>>> MAX_SPLIT_SIZE_DISCREPANCY);
>>>
>>> if (len > 0) {
>>>
>>> // get the block locations and make sure they are in order with
>>> // respect to their offset
>>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>>> 0, len);
>>> Arrays.sort(blocks);
>>>
>>> long bytesUnassigned = len;
>>> long position = 0;
>>>
>>> int blockIndex = 0;
>>>
>>> while (bytesUnassigned > maxBytesForLastSplit) {
>>> // get the block containing the majority of the data
>>> blockIndex = getBlockIndexForPosition(blocks, position,
>>> halfSplit, blockIndex);
>>> // create a new split
>>> FileInputSplit fis = new FileInputSplit(splitNum++,
>>> file.getPath(), position, splitSize,
>>> blocks[blockIndex].getHosts());
>>> inputSplits.add(fis);
>>>
>>> // adjust the positions
>>> position += splitSize;
>>> bytesUnassigned -= splitSize;
>>> }
>>>
>>> // assign the last split
>>> if (bytesUnassigned > 0) {
>>> blockIndex = getBlockIndexForPosition(blocks, position,
>>> halfSplit, blockIndex);
>>> final FileInputSplit fis = new FileInputSplit(splitNum++,
>>> file.getPath(), position, bytesUnassigned,
>>> blocks[blockIndex].getHosts());
>>> inputSplits.add(fis);
>>> }
>>> } else {
>>> // special case with a file of zero bytes size
>>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>>> 0, 0);
>>> String[] hosts;
>>> if (blocks.length > 0) {
>>> hosts = blocks[0].getHosts();
>>> } else {
>>> hosts = new String[0];
>>> }
>>> final FileInputSplit fis = new FileInputSplit(splitNum++,
>>> file.getPath(), 0, 0, hosts);
>>> inputSplits.add(fis);
>>> }
>>> }
>>>
>>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
>>> }
>>>
>>> /**
>>> * Recursively traverse the input directory structure and enumerate all
>>> * accepted nested files.
>>> *
>>> * @return the total length of accepted files.
>>> */
>>> private long addNestedFiles(Path path, List<FileStatus> files, long
>>> length,
>>> boolean logExcludedFiles) throws IOException {
>>> final FileSystem fs = path.getFileSystem();
>>>
>>> for (FileStatus dir : fs.listStatus(path)) {
>>> if (dir.isDir()) {
>>> if (acceptFile(dir)) {
>>> addNestedFiles(dir.getPath(), files, length,
>>> logExcludedFiles);
>>> } else {
>>> if (logExcludedFiles && LOG.isDebugEnabled()) {
>>> LOG.debug("Directory "
>>> + dir.getPath().toString()
>>> + " did not pass the file-filter and is excluded.");
>>> }
>>> }
>>> } else {
>>> if (acceptFile(dir)) {
>>> files.add(dir);
>>> length += dir.getLen();
>>> testForUnsplittable(dir);
>>> } else {
>>> if (logExcludedFiles && LOG.isDebugEnabled()) {
>>> LOG.debug("Directory "
>>> + dir.getPath().toString()
>>> + " did not pass the file-filter and is excluded.");
>>> }
>>> }
>>> }
>>> }
>>> return length;
>>> }
>>>
>>> /**
>>> * Retrieves the index of the <tt>BlockLocation</tt> that contains the
>>> part
>>> * of the file described by the given offset.
>>> *
>>> * @param blocks
>>> *            The different blocks of the file. Must be ordered by their
>>> *            offset.
>>> * @param offset
>>> *            The offset of the position in the file.
>>> * @param startIndex
>>> *            The earliest index to look at.
>>> * @return The index of the block containing the given position.
>>> */
>>> private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
>>> long halfSplitSize, int startIndex) {
>>> // go over all indexes after the startIndex
>>> for (int i = startIndex; i < blocks.length; i++) {
>>> long blockStart = blocks[i].getOffset();
>>> long blockEnd = blockStart + blocks[i].getLength();
>>>
>>> if (offset >= blockStart && offset < blockEnd) {
>>> // got the block where the split starts
>>> // check if the next block contains more than this one does
>>> if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
>>> return i + 1;
>>> } else {
>>> return i;
>>> }
>>> }
>>> }
>>> throw new IllegalArgumentException("The given offset is not contained in
>>> the any block.");
>>> }
>>>
>>> }
>>>
>>>
>>>
>>>
>>> On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> You need to do something like this:
>>>>
>>>> public class YourInputFormat extends FileInputFormat<Object> {
>>>>
>>>>    private boolean objectRead;
>>>>
>>>>    @Override
>>>>    public FileInputSplit[] createInputSplits(int minNumSplits) {
>>>>       // Create one FileInputSplit for each file you want to read.
>>>>       // Check FileInputFormat for how to recursively enumerate files.
>>>>       // Input splits must start at 0 and have a length equal to length
>>>> of the file to read.
>>>>       return null;
>>>>    }
>>>>
>>>>    @Override
>>>>    public void open(FileInputSplit split) throws IOException {
>>>>       super.open(split);
>>>>       objectRead = false;
>>>>    }
>>>>
>>>>    @Override
>>>>    public boolean reachedEnd() throws IOException {
>>>>       return this.objectRead;
>>>>    }
>>>>
>>>>    @Override
>>>>    public Object nextRecord(Object reuse) throws IOException {
>>>>       Object yourObject = this.stream.read(); // use Kryo here to read
>>>> from this.stream()
>>>>       this.objectRead = true; // read only one object
>>>>       return yourObject;
>>>>    }
>>>> }
>>>>
>>>> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Sorry Fabian but I don't understand what I should do :(
>>>>> Could you provide me a simple snippet of code to achieve this?
>>>>>
>>>>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Enumeration of nested files is a feature of the FileInputFormat.
>>>>>> If you implement your own IF based on FileInputFormat as I suggested
>>>>>> before, you can use that feature.
>>>>>>
>>>>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> I have a directory containing a list of files, each one containing a
>>>>>>> kryo-serialized object.
>>>>>>> With json serialized objects I don't have that problem (but there I
>>>>>>> use  env.readTextFile(path.withParameters(parameters)
>>>>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I don't know your use case.
>>>>>>>> The InputFormat interface is very flexible. Directories can be
>>>>>>>> recursively read. A file can contain one or more objects. You can also make
>>>>>>>> a smarter IF and put multiple (small) files into one split...
>>>>>>>>
>>>>>>>> It is up to your use case what you need to implement.
>>>>>>>>
>>>>>>>>
>>>>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> Should this be the case just reading recursively an entire
>>>>>>>>> directory containing one object per file?
>>>>>>>>>
>>>>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You could implement your own InputFormat based on FileInputFormat
>>>>>>>>>> and overwrite the createInputSplits method to just create a single split
>>>>>>>>>> per file.
>>>>>>>>>>
>>>>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> So what should I do?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <
>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ah, I checked the code.
>>>>>>>>>>>>
>>>>>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>>>>>> BinaryOutputFormat.
>>>>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which
>>>>>>>>>>>> does not provide the metadata.
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>
>>>>>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <
>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I also tried with
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Moreover, in this example I put exactly one object per file
>>>>>>>>>>>>>>> so it should be able to deserialize it, right?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a parallelism of
>>>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>>>> Writing binary files in a way that they can be read in
>>>>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get
>>>>>>>>>>>>>>>>> this exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass
>>>>>>>>>>>>>>>>> object) {
>>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>>>>>> type) {
>>>>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>

Re: Invalid argument reading a file containing a Kryo object

Posted by Flavio Pompermaier <po...@okkam.it>.
Done through https://issues.apache.org/jira/browse/FLINK-2503

Thanks again,
Flavio

On Mon, Aug 10, 2015 at 12:11 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Congrats that you got your InputFormat working!
> It is true, there can be a few inconsistencies in the Formats derived from
> FileInputFormat.
>
> It would be great if you could open JIRAs for these issues. Otherwise, the
> might get lost on the mailing list.
>
> Thanks, Fabian
>
> 2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi Fabian,
>> thanks to your help I finally managed to successfully generate a DataSet
>> from my folder but I think that there are some inconsistencies in the
>> hierarchy of InputFormats.
>> The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow
>> inherit the behaviour of the FileInputFormat (so respect *unsplittable*
>> and *enumerateNestedFiles*) while they doesn't take into account those
>> flags.
>> Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix
>> this shit"* that maybe should be removed or fixed :)
>>
>> Also maintaing aligned testForUnsplittable and decorateInputStream is
>> somehow dangerous..
>> And maybe visibility for getBlockIndexForPosition should be changed to
>> protected?
>>
>> So basically, my needs was to implement
>> a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a
>> lot of overrides..am I doing something wrong or are those inputFormat
>> somehow to improve..? This is my IF code (*remark*: from the comment *"Copied
>> from FileInputFormat (override TypeSerializerInputFormat)"* on the code
>> is copied-and-pasted from FileInputFormat..thus MY code ends there):
>>
>> public class RowBundleInputFormat extends
>> TypeSerializerInputFormat<RowBundle> {
>>
>> private static final long serialVersionUID = 1L;
>> private static final Logger LOG =
>> LoggerFactory.getLogger(RowBundleInputFormat.class);
>>
>> /** The fraction that the last split may be larger than the others. */
>> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
>> private boolean objectRead;
>>
>> public RowBundleInputFormat() {
>> super(new GenericTypeInfo<>(RowBundle.class));
>> unsplittable = true;
>> }
>>
>> @Override
>> protected FSDataInputStream decorateInputStream(FSDataInputStream
>> inputStream, FileInputSplit fileSplit) throws Throwable {
>> return inputStream;
>> }
>>
>> @Override
>> protected boolean testForUnsplittable(FileStatus pathFile) {
>> return true;
>> }
>>
>> @Override
>> public void open(FileInputSplit split) throws IOException {
>> super.open(split);
>> objectRead = false;
>> }
>>
>> @Override
>> public boolean reachedEnd() throws IOException {
>> return this.objectRead;
>> }
>>
>> @Override
>> public RowBundle nextRecord(RowBundle reuse) throws IOException {
>> RowBundle yourObject = super.nextRecord(reuse);
>> this.objectRead = true; // read only one object
>> return yourObject;
>> }
>>
>> // -------------------------------------------------------------------
>> // Copied from FileInputFormat (override TypeSerializerInputFormat)
>> // -------------------------------------------------------------------
>> @Override
>> public FileInputSplit[] createInputSplits(int minNumSplits)
>> throws IOException {
>> if (minNumSplits < 1) {
>> throw new IllegalArgumentException(
>> "Number of input splits has to be at least 1.");
>> }
>>
>> // take the desired number of splits into account
>> minNumSplits = Math.max(minNumSplits, this.numSplits);
>>
>> final Path path = this.filePath;
>> final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
>> minNumSplits);
>>
>> // get all the files that are involved in the splits
>> List<FileStatus> files = new ArrayList<FileStatus>();
>> long totalLength = 0;
>>
>> final FileSystem fs = path.getFileSystem();
>> final FileStatus pathFile = fs.getFileStatus(path);
>>
>> if (pathFile.isDir()) {
>> // input is directory. list all contained files
>> final FileStatus[] dir = fs.listStatus(path);
>> for (int i = 0; i < dir.length; i++) {
>> if (dir[i].isDir()) {
>> if (enumerateNestedFiles) {
>> if (acceptFile(dir[i])) {
>> totalLength += addNestedFiles(dir[i].getPath(),
>> files, 0, true);
>> } else {
>> if (LOG.isDebugEnabled()) {
>> LOG.debug("Directory "
>> + dir[i].getPath().toString()
>> + " did not pass the file-filter and is excluded.");
>> }
>> }
>> }
>> } else {
>> if (acceptFile(dir[i])) {
>> files.add(dir[i]);
>> totalLength += dir[i].getLen();
>> // as soon as there is one deflate file in a directory,
>> // we can not split it
>> testForUnsplittable(dir[i]);
>> } else {
>> if (LOG.isDebugEnabled()) {
>> LOG.debug("File "
>> + dir[i].getPath().toString()
>> + " did not pass the file-filter and is excluded.");
>> }
>> }
>> }
>> }
>> } else {
>> testForUnsplittable(pathFile);
>>
>> files.add(pathFile);
>> totalLength += pathFile.getLen();
>> }
>> // returns if unsplittable
>> if (unsplittable) {
>> int splitNum = 0;
>> for (final FileStatus file : files) {
>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>> 0, file.getLen());
>> Set<String> hosts = new HashSet<String>();
>> for (BlockLocation block : blocks) {
>> hosts.addAll(Arrays.asList(block.getHosts()));
>> }
>> long len = file.getLen();
>> if (testForUnsplittable(file)) {
>> len = READ_WHOLE_SPLIT_FLAG;
>> }
>> FileInputSplit fis = new FileInputSplit(splitNum++,
>> file.getPath(), 0, len, hosts.toArray(new String[hosts
>> .size()]));
>> inputSplits.add(fis);
>> }
>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
>> }
>>
>> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
>> : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
>> : 1));
>>
>> // now that we have the files, generate the splits
>> int splitNum = 0;
>> for (final FileStatus file : files) {
>>
>> final long len = file.getLen();
>> final long blockSize = file.getBlockSize();
>>
>> final long minSplitSize;
>> if (this.minSplitSize <= blockSize) {
>> minSplitSize = this.minSplitSize;
>> } else {
>> if (LOG.isWarnEnabled()) {
>> LOG.warn("Minimal split size of " + this.minSplitSize
>> + " is larger than the block size of " + blockSize
>> + ". Decreasing minimal split size to block size.");
>> }
>> minSplitSize = blockSize;
>> }
>>
>> final long splitSize = Math.max(minSplitSize,
>> Math.min(maxSplitSize, blockSize));
>> final long halfSplit = splitSize >>> 1;
>>
>> final long maxBytesForLastSplit = (long) (splitSize *
>> MAX_SPLIT_SIZE_DISCREPANCY);
>>
>> if (len > 0) {
>>
>> // get the block locations and make sure they are in order with
>> // respect to their offset
>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>> 0, len);
>> Arrays.sort(blocks);
>>
>> long bytesUnassigned = len;
>> long position = 0;
>>
>> int blockIndex = 0;
>>
>> while (bytesUnassigned > maxBytesForLastSplit) {
>> // get the block containing the majority of the data
>> blockIndex = getBlockIndexForPosition(blocks, position,
>> halfSplit, blockIndex);
>> // create a new split
>> FileInputSplit fis = new FileInputSplit(splitNum++,
>> file.getPath(), position, splitSize,
>> blocks[blockIndex].getHosts());
>> inputSplits.add(fis);
>>
>> // adjust the positions
>> position += splitSize;
>> bytesUnassigned -= splitSize;
>> }
>>
>> // assign the last split
>> if (bytesUnassigned > 0) {
>> blockIndex = getBlockIndexForPosition(blocks, position,
>> halfSplit, blockIndex);
>> final FileInputSplit fis = new FileInputSplit(splitNum++,
>> file.getPath(), position, bytesUnassigned,
>> blocks[blockIndex].getHosts());
>> inputSplits.add(fis);
>> }
>> } else {
>> // special case with a file of zero bytes size
>> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
>> 0, 0);
>> String[] hosts;
>> if (blocks.length > 0) {
>> hosts = blocks[0].getHosts();
>> } else {
>> hosts = new String[0];
>> }
>> final FileInputSplit fis = new FileInputSplit(splitNum++,
>> file.getPath(), 0, 0, hosts);
>> inputSplits.add(fis);
>> }
>> }
>>
>> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
>> }
>>
>> /**
>> * Recursively traverse the input directory structure and enumerate all
>> * accepted nested files.
>> *
>> * @return the total length of accepted files.
>> */
>> private long addNestedFiles(Path path, List<FileStatus> files, long
>> length,
>> boolean logExcludedFiles) throws IOException {
>> final FileSystem fs = path.getFileSystem();
>>
>> for (FileStatus dir : fs.listStatus(path)) {
>> if (dir.isDir()) {
>> if (acceptFile(dir)) {
>> addNestedFiles(dir.getPath(), files, length,
>> logExcludedFiles);
>> } else {
>> if (logExcludedFiles && LOG.isDebugEnabled()) {
>> LOG.debug("Directory "
>> + dir.getPath().toString()
>> + " did not pass the file-filter and is excluded.");
>> }
>> }
>> } else {
>> if (acceptFile(dir)) {
>> files.add(dir);
>> length += dir.getLen();
>> testForUnsplittable(dir);
>> } else {
>> if (logExcludedFiles && LOG.isDebugEnabled()) {
>> LOG.debug("Directory "
>> + dir.getPath().toString()
>> + " did not pass the file-filter and is excluded.");
>> }
>> }
>> }
>> }
>> return length;
>> }
>>
>> /**
>> * Retrieves the index of the <tt>BlockLocation</tt> that contains the part
>> * of the file described by the given offset.
>> *
>> * @param blocks
>> *            The different blocks of the file. Must be ordered by their
>> *            offset.
>> * @param offset
>> *            The offset of the position in the file.
>> * @param startIndex
>> *            The earliest index to look at.
>> * @return The index of the block containing the given position.
>> */
>> private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
>> long halfSplitSize, int startIndex) {
>> // go over all indexes after the startIndex
>> for (int i = startIndex; i < blocks.length; i++) {
>> long blockStart = blocks[i].getOffset();
>> long blockEnd = blockStart + blocks[i].getLength();
>>
>> if (offset >= blockStart && offset < blockEnd) {
>> // got the block where the split starts
>> // check if the next block contains more than this one does
>> if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
>> return i + 1;
>> } else {
>> return i;
>> }
>> }
>> }
>> throw new IllegalArgumentException("The given offset is not contained in
>> the any block.");
>> }
>>
>> }
>>
>>
>>
>>
>> On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> You need to do something like this:
>>>
>>> public class YourInputFormat extends FileInputFormat<Object> {
>>>
>>>    private boolean objectRead;
>>>
>>>    @Override
>>>    public FileInputSplit[] createInputSplits(int minNumSplits) {
>>>       // Create one FileInputSplit for each file you want to read.
>>>       // Check FileInputFormat for how to recursively enumerate files.
>>>       // Input splits must start at 0 and have a length equal to length
>>> of the file to read.
>>>       return null;
>>>    }
>>>
>>>    @Override
>>>    public void open(FileInputSplit split) throws IOException {
>>>       super.open(split);
>>>       objectRead = false;
>>>    }
>>>
>>>    @Override
>>>    public boolean reachedEnd() throws IOException {
>>>       return this.objectRead;
>>>    }
>>>
>>>    @Override
>>>    public Object nextRecord(Object reuse) throws IOException {
>>>       Object yourObject = this.stream.read(); // use Kryo here to read
>>> from this.stream()
>>>       this.objectRead = true; // read only one object
>>>       return yourObject;
>>>    }
>>> }
>>>
>>> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Sorry Fabian but I don't understand what I should do :(
>>>> Could you provide me a simple snippet of code to achieve this?
>>>>
>>>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Enumeration of nested files is a feature of the FileInputFormat.
>>>>> If you implement your own IF based on FileInputFormat as I suggested
>>>>> before, you can use that feature.
>>>>>
>>>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> I have a directory containing a list of files, each one containing a
>>>>>> kryo-serialized object.
>>>>>> With json serialized objects I don't have that problem (but there I
>>>>>> use  env.readTextFile(path.withParameters(parameters)
>>>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I don't know your use case.
>>>>>>> The InputFormat interface is very flexible. Directories can be
>>>>>>> recursively read. A file can contain one or more objects. You can also make
>>>>>>> a smarter IF and put multiple (small) files into one split...
>>>>>>>
>>>>>>> It is up to your use case what you need to implement.
>>>>>>>
>>>>>>>
>>>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <po...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> Should this be the case just reading recursively an entire
>>>>>>>> directory containing one object per file?
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> You could implement your own InputFormat based on FileInputFormat
>>>>>>>>> and overwrite the createInputSplits method to just create a single split
>>>>>>>>> per file.
>>>>>>>>>
>>>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> So what should I do?
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Ah, I checked the code.
>>>>>>>>>>>
>>>>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>>>>> BinaryOutputFormat.
>>>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which
>>>>>>>>>>> does not provide the metadata.
>>>>>>>>>>>
>>>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>
>>>>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <
>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I also tried with
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Moreover, in this example I put exactly one object per file
>>>>>>>>>>>>>> so it should be able to deserialize it, right?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a parallelism of
>>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>>> Writing binary files in a way that they can be read in
>>>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get
>>>>>>>>>>>>>>>> this exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object)
>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>>>>> type) {
>>>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>>

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
Congrats that you got your InputFormat working!
It is true, there can be a few inconsistencies in the Formats derived from
FileInputFormat.

It would be great if you could open JIRAs for these issues. Otherwise, the
might get lost on the mailing list.

Thanks, Fabian

2015-08-10 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Hi Fabian,
> thanks to your help I finally managed to successfully generate a DataSet
> from my folder but I think that there are some inconsistencies in the
> hierarchy of InputFormats.
> The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow
> inherit the behaviour of the FileInputFormat (so respect *unsplittable*
> and *enumerateNestedFiles*) while they doesn't take into account those
> flags.
> Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix this
> shit"* that maybe should be removed or fixed :)
>
> Also maintaing aligned testForUnsplittable and decorateInputStream is
> somehow dangerous..
> And maybe visibility for getBlockIndexForPosition should be changed to
> protected?
>
> So basically, my needs was to implement
> a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a
> lot of overrides..am I doing something wrong or are those inputFormat
> somehow to improve..? This is my IF code (*remark*: from the comment *"Copied
> from FileInputFormat (override TypeSerializerInputFormat)"* on the code
> is copied-and-pasted from FileInputFormat..thus MY code ends there):
>
> public class RowBundleInputFormat extends
> TypeSerializerInputFormat<RowBundle> {
>
> private static final long serialVersionUID = 1L;
> private static final Logger LOG =
> LoggerFactory.getLogger(RowBundleInputFormat.class);
>
> /** The fraction that the last split may be larger than the others. */
> private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
> private boolean objectRead;
>
> public RowBundleInputFormat() {
> super(new GenericTypeInfo<>(RowBundle.class));
> unsplittable = true;
> }
>
> @Override
> protected FSDataInputStream decorateInputStream(FSDataInputStream
> inputStream, FileInputSplit fileSplit) throws Throwable {
> return inputStream;
> }
>
> @Override
> protected boolean testForUnsplittable(FileStatus pathFile) {
> return true;
> }
>
> @Override
> public void open(FileInputSplit split) throws IOException {
> super.open(split);
> objectRead = false;
> }
>
> @Override
> public boolean reachedEnd() throws IOException {
> return this.objectRead;
> }
>
> @Override
> public RowBundle nextRecord(RowBundle reuse) throws IOException {
> RowBundle yourObject = super.nextRecord(reuse);
> this.objectRead = true; // read only one object
> return yourObject;
> }
>
> // -------------------------------------------------------------------
> // Copied from FileInputFormat (override TypeSerializerInputFormat)
> // -------------------------------------------------------------------
> @Override
> public FileInputSplit[] createInputSplits(int minNumSplits)
> throws IOException {
> if (minNumSplits < 1) {
> throw new IllegalArgumentException(
> "Number of input splits has to be at least 1.");
> }
>
> // take the desired number of splits into account
> minNumSplits = Math.max(minNumSplits, this.numSplits);
>
> final Path path = this.filePath;
> final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
> minNumSplits);
>
> // get all the files that are involved in the splits
> List<FileStatus> files = new ArrayList<FileStatus>();
> long totalLength = 0;
>
> final FileSystem fs = path.getFileSystem();
> final FileStatus pathFile = fs.getFileStatus(path);
>
> if (pathFile.isDir()) {
> // input is directory. list all contained files
> final FileStatus[] dir = fs.listStatus(path);
> for (int i = 0; i < dir.length; i++) {
> if (dir[i].isDir()) {
> if (enumerateNestedFiles) {
> if (acceptFile(dir[i])) {
> totalLength += addNestedFiles(dir[i].getPath(),
> files, 0, true);
> } else {
> if (LOG.isDebugEnabled()) {
> LOG.debug("Directory "
> + dir[i].getPath().toString()
> + " did not pass the file-filter and is excluded.");
> }
> }
> }
> } else {
> if (acceptFile(dir[i])) {
> files.add(dir[i]);
> totalLength += dir[i].getLen();
> // as soon as there is one deflate file in a directory,
> // we can not split it
> testForUnsplittable(dir[i]);
> } else {
> if (LOG.isDebugEnabled()) {
> LOG.debug("File "
> + dir[i].getPath().toString()
> + " did not pass the file-filter and is excluded.");
> }
> }
> }
> }
> } else {
> testForUnsplittable(pathFile);
>
> files.add(pathFile);
> totalLength += pathFile.getLen();
> }
> // returns if unsplittable
> if (unsplittable) {
> int splitNum = 0;
> for (final FileStatus file : files) {
> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
> 0, file.getLen());
> Set<String> hosts = new HashSet<String>();
> for (BlockLocation block : blocks) {
> hosts.addAll(Arrays.asList(block.getHosts()));
> }
> long len = file.getLen();
> if (testForUnsplittable(file)) {
> len = READ_WHOLE_SPLIT_FLAG;
> }
> FileInputSplit fis = new FileInputSplit(splitNum++,
> file.getPath(), 0, len, hosts.toArray(new String[hosts
> .size()]));
> inputSplits.add(fis);
> }
> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
> }
>
> final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
> : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
> : 1));
>
> // now that we have the files, generate the splits
> int splitNum = 0;
> for (final FileStatus file : files) {
>
> final long len = file.getLen();
> final long blockSize = file.getBlockSize();
>
> final long minSplitSize;
> if (this.minSplitSize <= blockSize) {
> minSplitSize = this.minSplitSize;
> } else {
> if (LOG.isWarnEnabled()) {
> LOG.warn("Minimal split size of " + this.minSplitSize
> + " is larger than the block size of " + blockSize
> + ". Decreasing minimal split size to block size.");
> }
> minSplitSize = blockSize;
> }
>
> final long splitSize = Math.max(minSplitSize,
> Math.min(maxSplitSize, blockSize));
> final long halfSplit = splitSize >>> 1;
>
> final long maxBytesForLastSplit = (long) (splitSize *
> MAX_SPLIT_SIZE_DISCREPANCY);
>
> if (len > 0) {
>
> // get the block locations and make sure they are in order with
> // respect to their offset
> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
> 0, len);
> Arrays.sort(blocks);
>
> long bytesUnassigned = len;
> long position = 0;
>
> int blockIndex = 0;
>
> while (bytesUnassigned > maxBytesForLastSplit) {
> // get the block containing the majority of the data
> blockIndex = getBlockIndexForPosition(blocks, position,
> halfSplit, blockIndex);
> // create a new split
> FileInputSplit fis = new FileInputSplit(splitNum++,
> file.getPath(), position, splitSize,
> blocks[blockIndex].getHosts());
> inputSplits.add(fis);
>
> // adjust the positions
> position += splitSize;
> bytesUnassigned -= splitSize;
> }
>
> // assign the last split
> if (bytesUnassigned > 0) {
> blockIndex = getBlockIndexForPosition(blocks, position,
> halfSplit, blockIndex);
> final FileInputSplit fis = new FileInputSplit(splitNum++,
> file.getPath(), position, bytesUnassigned,
> blocks[blockIndex].getHosts());
> inputSplits.add(fis);
> }
> } else {
> // special case with a file of zero bytes size
> final BlockLocation[] blocks = fs.getFileBlockLocations(file,
> 0, 0);
> String[] hosts;
> if (blocks.length > 0) {
> hosts = blocks[0].getHosts();
> } else {
> hosts = new String[0];
> }
> final FileInputSplit fis = new FileInputSplit(splitNum++,
> file.getPath(), 0, 0, hosts);
> inputSplits.add(fis);
> }
> }
>
> return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
> }
>
> /**
> * Recursively traverse the input directory structure and enumerate all
> * accepted nested files.
> *
> * @return the total length of accepted files.
> */
> private long addNestedFiles(Path path, List<FileStatus> files, long length,
> boolean logExcludedFiles) throws IOException {
> final FileSystem fs = path.getFileSystem();
>
> for (FileStatus dir : fs.listStatus(path)) {
> if (dir.isDir()) {
> if (acceptFile(dir)) {
> addNestedFiles(dir.getPath(), files, length,
> logExcludedFiles);
> } else {
> if (logExcludedFiles && LOG.isDebugEnabled()) {
> LOG.debug("Directory "
> + dir.getPath().toString()
> + " did not pass the file-filter and is excluded.");
> }
> }
> } else {
> if (acceptFile(dir)) {
> files.add(dir);
> length += dir.getLen();
> testForUnsplittable(dir);
> } else {
> if (logExcludedFiles && LOG.isDebugEnabled()) {
> LOG.debug("Directory "
> + dir.getPath().toString()
> + " did not pass the file-filter and is excluded.");
> }
> }
> }
> }
> return length;
> }
>
> /**
> * Retrieves the index of the <tt>BlockLocation</tt> that contains the part
> * of the file described by the given offset.
> *
> * @param blocks
> *            The different blocks of the file. Must be ordered by their
> *            offset.
> * @param offset
> *            The offset of the position in the file.
> * @param startIndex
> *            The earliest index to look at.
> * @return The index of the block containing the given position.
> */
> private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
> long halfSplitSize, int startIndex) {
> // go over all indexes after the startIndex
> for (int i = startIndex; i < blocks.length; i++) {
> long blockStart = blocks[i].getOffset();
> long blockEnd = blockStart + blocks[i].getLength();
>
> if (offset >= blockStart && offset < blockEnd) {
> // got the block where the split starts
> // check if the next block contains more than this one does
> if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
> return i + 1;
> } else {
> return i;
> }
> }
> }
> throw new IllegalArgumentException("The given offset is not contained in
> the any block.");
> }
>
> }
>
>
>
>
> On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> You need to do something like this:
>>
>> public class YourInputFormat extends FileInputFormat<Object> {
>>
>>    private boolean objectRead;
>>
>>    @Override
>>    public FileInputSplit[] createInputSplits(int minNumSplits) {
>>       // Create one FileInputSplit for each file you want to read.
>>       // Check FileInputFormat for how to recursively enumerate files.
>>       // Input splits must start at 0 and have a length equal to length
>> of the file to read.
>>       return null;
>>    }
>>
>>    @Override
>>    public void open(FileInputSplit split) throws IOException {
>>       super.open(split);
>>       objectRead = false;
>>    }
>>
>>    @Override
>>    public boolean reachedEnd() throws IOException {
>>       return this.objectRead;
>>    }
>>
>>    @Override
>>    public Object nextRecord(Object reuse) throws IOException {
>>       Object yourObject = this.stream.read(); // use Kryo here to read
>> from this.stream()
>>       this.objectRead = true; // read only one object
>>       return yourObject;
>>    }
>> }
>>
>> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Sorry Fabian but I don't understand what I should do :(
>>> Could you provide me a simple snippet of code to achieve this?
>>>
>>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Enumeration of nested files is a feature of the FileInputFormat.
>>>> If you implement your own IF based on FileInputFormat as I suggested
>>>> before, you can use that feature.
>>>>
>>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> I have a directory containing a list of files, each one containing a
>>>>> kryo-serialized object.
>>>>> With json serialized objects I don't have that problem (but there I
>>>>> use  env.readTextFile(path.withParameters(parameters)
>>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>>>
>>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I don't know your use case.
>>>>>> The InputFormat interface is very flexible. Directories can be
>>>>>> recursively read. A file can contain one or more objects. You can also make
>>>>>> a smarter IF and put multiple (small) files into one split...
>>>>>>
>>>>>> It is up to your use case what you need to implement.
>>>>>>
>>>>>>
>>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> Should this be the case just reading recursively an entire directory
>>>>>>> containing one object per file?
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You could implement your own InputFormat based on FileInputFormat
>>>>>>>> and overwrite the createInputSplits method to just create a single split
>>>>>>>> per file.
>>>>>>>>
>>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> So what should I do?
>>>>>>>>>
>>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ah, I checked the code.
>>>>>>>>>>
>>>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>>>> BinaryOutputFormat.
>>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which does
>>>>>>>>>> not provide the metadata.
>>>>>>>>>>
>>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <
>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>
>>>>>>>>>>>>> I also tried with
>>>>>>>>>>>>>
>>>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>>>
>>>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>>>
>>>>>>>>>>>>> Moreover, in this example I put exactly one object per file so
>>>>>>>>>>>>> it should be able to deserialize it, right?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a parallelism of
>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>> Writing binary files in a way that they can be read in
>>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get
>>>>>>>>>>>>>>> this exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>>>> type) {
>>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
>
> Flavio Pompermaier
>
> *Development Department*_______________________________________________
> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>
> *Phone:* +(39) 0461 283 702
> *Fax:* + (39) 0461 186 6433
> *Email:* pompermaier@okkam.it
> *Headquarters:* Trento (Italy), via G.B. Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Fabian,
thanks to your help I finally managed to successfully generate a DataSet
from my folder but I think that there are some inconsistencies in the
hierarchy of InputFormats.
The *BinaryOutputFormat*/*TypeSerializerInputFormat* should somehow inherit
the behaviour of the FileInputFormat (so respect *unsplittable* and
*enumerateNestedFiles*) while they doesn't take into account those flags.
Moreover in the *TypeSerializerInputFormat* there's a *"// TODO: fix this
shit"* that maybe should be removed or fixed :)

Also maintaing aligned testForUnsplittable and decorateInputStream is
somehow dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to
protected?

So basically, my needs was to implement
a TypeSerializerInputFormat<RowBundle> but to achieve that I had to make a
lot of overrides..am I doing something wrong or are those inputFormat
somehow to improve..? This is my IF code (*remark*: from the comment *"Copied
from FileInputFormat (override TypeSerializerInputFormat)"* on the code is
copied-and-pasted from FileInputFormat..thus MY code ends there):

public class RowBundleInputFormat extends
TypeSerializerInputFormat<RowBundle> {

private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(RowBundleInputFormat.class);

/** The fraction that the last split may be larger than the others. */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
private boolean objectRead;

public RowBundleInputFormat() {
super(new GenericTypeInfo<>(RowBundle.class));
unsplittable = true;
}

@Override
protected FSDataInputStream decorateInputStream(FSDataInputStream
inputStream, FileInputSplit fileSplit) throws Throwable {
return inputStream;
}

@Override
protected boolean testForUnsplittable(FileStatus pathFile) {
return true;
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
objectRead = false;
}

@Override
public boolean reachedEnd() throws IOException {
return this.objectRead;
}

@Override
public RowBundle nextRecord(RowBundle reuse) throws IOException {
RowBundle yourObject = super.nextRecord(reuse);
this.objectRead = true; // read only one object
return yourObject;
}

// -------------------------------------------------------------------
// Copied from FileInputFormat (override TypeSerializerInputFormat)
// -------------------------------------------------------------------
@Override
public FileInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
if (minNumSplits < 1) {
throw new IllegalArgumentException(
"Number of input splits has to be at least 1.");
}

// take the desired number of splits into account
minNumSplits = Math.max(minNumSplits, this.numSplits);

final Path path = this.filePath;
final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(
minNumSplits);

// get all the files that are involved in the splits
List<FileStatus> files = new ArrayList<FileStatus>();
long totalLength = 0;

final FileSystem fs = path.getFileSystem();
final FileStatus pathFile = fs.getFileStatus(path);

if (pathFile.isDir()) {
// input is directory. list all contained files
final FileStatus[] dir = fs.listStatus(path);
for (int i = 0; i < dir.length; i++) {
if (dir[i].isDir()) {
if (enumerateNestedFiles) {
if (acceptFile(dir[i])) {
totalLength += addNestedFiles(dir[i].getPath(),
files, 0, true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
} else {
if (acceptFile(dir[i])) {
files.add(dir[i]);
totalLength += dir[i].getLen();
// as soon as there is one deflate file in a directory,
// we can not split it
testForUnsplittable(dir[i]);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("File "
+ dir[i].getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
} else {
testForUnsplittable(pathFile);

files.add(pathFile);
totalLength += pathFile.getLen();
}
// returns if unsplittable
if (unsplittable) {
int splitNum = 0;
for (final FileStatus file : files) {
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, file.getLen());
Set<String> hosts = new HashSet<String>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) {
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, len, hosts.toArray(new String[hosts
.size()]));
inputSplits.add(fis);
}
return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE
: (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0
: 1));

// now that we have the files, generate the splits
int splitNum = 0;
for (final FileStatus file : files) {

final long len = file.getLen();
final long blockSize = file.getBlockSize();

final long minSplitSize;
if (this.minSplitSize <= blockSize) {
minSplitSize = this.minSplitSize;
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Minimal split size of " + this.minSplitSize
+ " is larger than the block size of " + blockSize
+ ". Decreasing minimal split size to block size.");
}
minSplitSize = blockSize;
}

final long splitSize = Math.max(minSplitSize,
Math.min(maxSplitSize, blockSize));
final long halfSplit = splitSize >>> 1;

final long maxBytesForLastSplit = (long) (splitSize *
MAX_SPLIT_SIZE_DISCREPANCY);

if (len > 0) {

// get the block locations and make sure they are in order with
// respect to their offset
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, len);
Arrays.sort(blocks);

long bytesUnassigned = len;
long position = 0;

int blockIndex = 0;

while (bytesUnassigned > maxBytesForLastSplit) {
// get the block containing the majority of the data
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
// create a new split
FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, splitSize,
blocks[blockIndex].getHosts());
inputSplits.add(fis);

// adjust the positions
position += splitSize;
bytesUnassigned -= splitSize;
}

// assign the last split
if (bytesUnassigned > 0) {
blockIndex = getBlockIndexForPosition(blocks, position,
halfSplit, blockIndex);
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), position, bytesUnassigned,
blocks[blockIndex].getHosts());
inputSplits.add(fis);
}
} else {
// special case with a file of zero bytes size
final BlockLocation[] blocks = fs.getFileBlockLocations(file,
0, 0);
String[] hosts;
if (blocks.length > 0) {
hosts = blocks[0].getHosts();
} else {
hosts = new String[0];
}
final FileInputSplit fis = new FileInputSplit(splitNum++,
file.getPath(), 0, 0, hosts);
inputSplits.add(fis);
}
}

return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
}

/**
* Recursively traverse the input directory structure and enumerate all
* accepted nested files.
*
* @return the total length of accepted files.
*/
private long addNestedFiles(Path path, List<FileStatus> files, long length,
boolean logExcludedFiles) throws IOException {
final FileSystem fs = path.getFileSystem();

for (FileStatus dir : fs.listStatus(path)) {
if (dir.isDir()) {
if (acceptFile(dir)) {
addNestedFiles(dir.getPath(), files, length,
logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
} else {
if (acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "
+ dir.getPath().toString()
+ " did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}

/**
* Retrieves the index of the <tt>BlockLocation</tt> that contains the part
* of the file described by the given offset.
*
* @param blocks
*            The different blocks of the file. Must be ordered by their
*            offset.
* @param offset
*            The offset of the position in the file.
* @param startIndex
*            The earliest index to look at.
* @return The index of the block containing the given position.
*/
private int getBlockIndexForPosition(BlockLocation[] blocks, long offset,
long halfSplitSize, int startIndex) {
// go over all indexes after the startIndex
for (int i = startIndex; i < blocks.length; i++) {
long blockStart = blocks[i].getOffset();
long blockEnd = blockStart + blocks[i].getLength();

if (offset >= blockStart && offset < blockEnd) {
// got the block where the split starts
// check if the next block contains more than this one does
if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
return i + 1;
} else {
return i;
}
}
}
throw new IllegalArgumentException("The given offset is not contained in
the any block.");
}

}




On Sun, Aug 9, 2015 at 2:00 PM, Fabian Hueske <fh...@gmail.com> wrote:

> You need to do something like this:
>
> public class YourInputFormat extends FileInputFormat<Object> {
>
>    private boolean objectRead;
>
>    @Override
>    public FileInputSplit[] createInputSplits(int minNumSplits) {
>       // Create one FileInputSplit for each file you want to read.
>       // Check FileInputFormat for how to recursively enumerate files.
>       // Input splits must start at 0 and have a length equal to length of
> the file to read.
>       return null;
>    }
>
>    @Override
>    public void open(FileInputSplit split) throws IOException {
>       super.open(split);
>       objectRead = false;
>    }
>
>    @Override
>    public boolean reachedEnd() throws IOException {
>       return this.objectRead;
>    }
>
>    @Override
>    public Object nextRecord(Object reuse) throws IOException {
>       Object yourObject = this.stream.read(); // use Kryo here to read
> from this.stream()
>       this.objectRead = true; // read only one object
>       return yourObject;
>    }
> }
>
> 2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Sorry Fabian but I don't understand what I should do :(
>> Could you provide me a simple snippet of code to achieve this?
>>
>> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Enumeration of nested files is a feature of the FileInputFormat.
>>> If you implement your own IF based on FileInputFormat as I suggested
>>> before, you can use that feature.
>>>
>>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> I have a directory containing a list of files, each one containing a
>>>> kryo-serialized object.
>>>> With json serialized objects I don't have that problem (but there I use
>>>>  env.readTextFile(path.withParameters(parameters)
>>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>>
>>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> I don't know your use case.
>>>>> The InputFormat interface is very flexible. Directories can be
>>>>> recursively read. A file can contain one or more objects. You can also make
>>>>> a smarter IF and put multiple (small) files into one split...
>>>>>
>>>>> It is up to your use case what you need to implement.
>>>>>
>>>>>
>>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> Should this be the case just reading recursively an entire directory
>>>>>> containing one object per file?
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> You could implement your own InputFormat based on FileInputFormat
>>>>>>> and overwrite the createInputSplits method to just create a single split
>>>>>>> per file.
>>>>>>>
>>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> So what should I do?
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ah, I checked the code.
>>>>>>>>>
>>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>>> BinaryOutputFormat.
>>>>>>>>> So you cannot use the BinaryInputFormat to read a file which does
>>>>>>>>> not provide the metadata.
>>>>>>>>>
>>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>>
>>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>
>>>>>>>>>>>> I also tried with
>>>>>>>>>>>>
>>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>>
>>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>>
>>>>>>>>>>>> Moreover, in this example I put exactly one object per file so
>>>>>>>>>>>> it should be able to deserialize it, right?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a parallelism of
>>>>>>>>>>>>> 1.
>>>>>>>>>>>>> Writing binary files in a way that they can be read in
>>>>>>>>>>>>> parallel is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>>> type) {
>>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 

Flavio Pompermaier

*Development Department*_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Phone:* +(39) 0461 283 702
*Fax:* + (39) 0461 186 6433
*Email:* pompermaier@okkam.it
*Headquarters:* Trento (Italy), via G.B. Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
You need to do something like this:

public class YourInputFormat extends FileInputFormat<Object> {

   private boolean objectRead;

   @Override
   public FileInputSplit[] createInputSplits(int minNumSplits) {
      // Create one FileInputSplit for each file you want to read.
      // Check FileInputFormat for how to recursively enumerate files.
      // Input splits must start at 0 and have a length equal to length of
the file to read.
      return null;
   }

   @Override
   public void open(FileInputSplit split) throws IOException {
      super.open(split);
      objectRead = false;
   }

   @Override
   public boolean reachedEnd() throws IOException {
      return this.objectRead;
   }

   @Override
   public Object nextRecord(Object reuse) throws IOException {
      Object yourObject = this.stream.read(); // use Kryo here to read from
this.stream()
      this.objectRead = true; // read only one object
      return yourObject;
   }
}

2015-08-07 14:40 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Sorry Fabian but I don't understand what I should do :(
> Could you provide me a simple snippet of code to achieve this?
>
> On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Enumeration of nested files is a feature of the FileInputFormat.
>> If you implement your own IF based on FileInputFormat as I suggested
>> before, you can use that feature.
>>
>> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> I have a directory containing a list of files, each one containing a
>>> kryo-serialized object.
>>> With json serialized objects I don't have that problem (but there I use
>>>  env.readTextFile(path.withParameters(parameters)
>>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>>
>>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> I don't know your use case.
>>>> The InputFormat interface is very flexible. Directories can be
>>>> recursively read. A file can contain one or more objects. You can also make
>>>> a smarter IF and put multiple (small) files into one split...
>>>>
>>>> It is up to your use case what you need to implement.
>>>>
>>>>
>>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Should this be the case just reading recursively an entire directory
>>>>> containing one object per file?
>>>>>
>>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You could implement your own InputFormat based on FileInputFormat and
>>>>>> overwrite the createInputSplits method to just create a single split per
>>>>>> file.
>>>>>>
>>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> So what should I do?
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ah, I checked the code.
>>>>>>>>
>>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>>> BinaryOutputFormat.
>>>>>>>> So you cannot use the BinaryInputFormat to read a file which does
>>>>>>>> not provide the metadata.
>>>>>>>>
>>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>>
>>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>>> BinaryInputFormat.
>>>>>>>>>> How large is the file with the single object?
>>>>>>>>>>
>>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> I also tried with
>>>>>>>>>>>
>>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>>
>>>>>>>>>>> but I get the same error :(
>>>>>>>>>>>
>>>>>>>>>>> Moreover, in this example I put exactly one object per file so
>>>>>>>>>>> it should be able to deserialize it, right?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <
>>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> If you create your file by just sequentially writing all
>>>>>>>>>>>> objects to the file using Kryo, you can only read it with a parallelism of
>>>>>>>>>>>> 1.
>>>>>>>>>>>> Writing binary files in a way that they can be read in parallel
>>>>>>>>>>>> is a bit tricky (and not specific to Flink).
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>
>>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>>
>>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>>
>>>>>>>>>>>>> ...
>>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>>> //serialise object
>>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>>
>>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>>
>>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>>
>>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>>
>>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>>
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>>> type) {
>>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>>> }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Flavio Pompermaier <po...@okkam.it>.
Sorry Fabian but I don't understand what I should do :(
Could you provide me a simple snippet of code to achieve this?

On Fri, Aug 7, 2015 at 1:30 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Enumeration of nested files is a feature of the FileInputFormat.
> If you implement your own IF based on FileInputFormat as I suggested
> before, you can use that feature.
>
> 2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> I have a directory containing a list of files, each one containing a
>> kryo-serialized object.
>> With json serialized objects I don't have that problem (but there I use
>>  env.readTextFile(path.withParameters(parameters)
>> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>>
>> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> I don't know your use case.
>>> The InputFormat interface is very flexible. Directories can be
>>> recursively read. A file can contain one or more objects. You can also make
>>> a smarter IF and put multiple (small) files into one split...
>>>
>>> It is up to your use case what you need to implement.
>>>
>>>
>>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Should this be the case just reading recursively an entire directory
>>>> containing one object per file?
>>>>
>>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> You could implement your own InputFormat based on FileInputFormat and
>>>>> overwrite the createInputSplits method to just create a single split per
>>>>> file.
>>>>>
>>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> So what should I do?
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ah, I checked the code.
>>>>>>>
>>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>>> BinaryOutputFormat.
>>>>>>> So you cannot use the BinaryInputFormat to read a file which does
>>>>>>> not provide the metadata.
>>>>>>>
>>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>>> BinaryInputFormat.
>>>>>>>>> How large is the file with the single object?
>>>>>>>>>
>>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> I also tried with
>>>>>>>>>>
>>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>>
>>>>>>>>>> but I get the same error :(
>>>>>>>>>>
>>>>>>>>>> Moreover, in this example I put exactly one object per file so it
>>>>>>>>>> should be able to deserialize it, right?
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> If you create your file by just sequentially writing all objects
>>>>>>>>>>> to the file using Kryo, you can only read it with a parallelism of 1.
>>>>>>>>>>> Writing binary files in a way that they can be read in parallel
>>>>>>>>>>> is a bit tricky (and not specific to Flink).
>>>>>>>>>>>
>>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>>
>>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>
>>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>>
>>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>>
>>>>>>>>>>>> ...
>>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>>> //serialise object
>>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> //deserialise object
>>>>>>>>>>>>
>>>>>>>>>>>> myObj=null;
>>>>>>>>>>>>
>>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>>
>>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>>
>>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>>> ds.print();
>>>>>>>>>>>>
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
>>>>>>>>>>>> type) {
>>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>>> }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Flavio
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
Enumeration of nested files is a feature of the FileInputFormat.
If you implement your own IF based on FileInputFormat as I suggested
before, you can use that feature.

2015-08-07 12:29 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> I have a directory containing a list of files, each one containing a
> kryo-serialized object.
> With json serialized objects I don't have that problem (but there I use
>  env.readTextFile(path.withParameters(parameters)
> where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).
>
> On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> I don't know your use case.
>> The InputFormat interface is very flexible. Directories can be
>> recursively read. A file can contain one or more objects. You can also make
>> a smarter IF and put multiple (small) files into one split...
>>
>> It is up to your use case what you need to implement.
>>
>>
>> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Should this be the case just reading recursively an entire directory
>>> containing one object per file?
>>>
>>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> You could implement your own InputFormat based on FileInputFormat and
>>>> overwrite the createInputSplits method to just create a single split per
>>>> file.
>>>>
>>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> So what should I do?
>>>>>
>>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ah, I checked the code.
>>>>>>
>>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>>> BinaryOutputFormat.
>>>>>> So you cannot use the BinaryInputFormat to read a file which does not
>>>>>> provide the metadata.
>>>>>>
>>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> The file containing the serialized object is 7 bytes
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>>> BinaryInputFormat.
>>>>>>>> How large is the file with the single object?
>>>>>>>>
>>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> I also tried with
>>>>>>>>>
>>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>>
>>>>>>>>> but I get the same error :(
>>>>>>>>>
>>>>>>>>> Moreover, in this example I put exactly one object per file so it
>>>>>>>>> should be able to deserialize it, right?
>>>>>>>>>
>>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> If you create your file by just sequentially writing all objects
>>>>>>>>>> to the file using Kryo, you can only read it with a parallelism of 1.
>>>>>>>>>> Writing binary files in a way that they can be read in parallel
>>>>>>>>>> is a bit tricky (and not specific to Flink).
>>>>>>>>>>
>>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> Hi to all,
>>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>
>>>>>>>>>>> -----------------------------------------------
>>>>>>>>>>> My program is basically the following:
>>>>>>>>>>>
>>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>>
>>>>>>>>>>> ...
>>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>>> //serialise object
>>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> //deserialise object
>>>>>>>>>>>
>>>>>>>>>>> myObj=null;
>>>>>>>>>>>
>>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>>
>>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>>
>>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>>> ds.print();
>>>>>>>>>>>
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type)
>>>>>>>>>>> {
>>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>>> }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Flavio Pompermaier <po...@okkam.it>.
I have a directory containing a list of files, each one containing a
kryo-serialized object.
With json serialized objects I don't have that problem (but there I use
 env.readTextFile(path.withParameters(parameters)
where parameters has the ENUMERATE_NESTED_FILES_FLAG set to true).

On Fri, Aug 7, 2015 at 12:14 PM, Fabian Hueske <fh...@gmail.com> wrote:

> I don't know your use case.
> The InputFormat interface is very flexible. Directories can be recursively
> read. A file can contain one or more objects. You can also make a smarter
> IF and put multiple (small) files into one split...
>
> It is up to your use case what you need to implement.
>
>
> 2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Should this be the case just reading recursively an entire directory
>> containing one object per file?
>>
>> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> You could implement your own InputFormat based on FileInputFormat and
>>> overwrite the createInputSplits method to just create a single split per
>>> file.
>>>
>>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> So what should I do?
>>>>
>>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Ah, I checked the code.
>>>>>
>>>>> The BinaryInputFormat expects metadata which is written be the
>>>>> BinaryOutputFormat.
>>>>> So you cannot use the BinaryInputFormat to read a file which does not
>>>>> provide the metadata.
>>>>>
>>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> The file containing the serialized object is 7 bytes
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This might be an issue with the blockSize parameter of the
>>>>>>> BinaryInputFormat.
>>>>>>> How large is the file with the single object?
>>>>>>>
>>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <po...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> I also tried with
>>>>>>>>
>>>>>>>> DataSet<RowBundle> ds =
>>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>>
>>>>>>>> but I get the same error :(
>>>>>>>>
>>>>>>>> Moreover, in this example I put exactly one object per file so it
>>>>>>>> should be able to deserialize it, right?
>>>>>>>>
>>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> If you create your file by just sequentially writing all objects
>>>>>>>>> to the file using Kryo, you can only read it with a parallelism of 1.
>>>>>>>>> Writing binary files in a way that they can be read in parallel is
>>>>>>>>> a bit tricky (and not specific to Flink).
>>>>>>>>>
>>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>
>>>>>>>>>> Hi to all,
>>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>>
>>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>
>>>>>>>>>> -----------------------------------------------
>>>>>>>>>> My program is basically the following:
>>>>>>>>>>
>>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>>
>>>>>>>>>> ...
>>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>>> try (Output output = new Output(new
>>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>>> //serialise object
>>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> //deserialise object
>>>>>>>>>>
>>>>>>>>>> myObj=null;
>>>>>>>>>>
>>>>>>>>>> try (Input input = new Input( new
>>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>>> MyClassSerializer.class);
>>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>>> 64*1024*1024);
>>>>>>>>>>
>>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>>
>>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>>> ds.print();
>>>>>>>>>>
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Am I doing something wrong?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively
read. A file can contain one or more objects. You can also make a smarter
IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.


2015-08-07 12:08 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Should this be the case just reading recursively an entire directory
> containing one object per file?
>
> On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> You could implement your own InputFormat based on FileInputFormat and
>> overwrite the createInputSplits method to just create a single split per
>> file.
>>
>> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> So what should I do?
>>>
>>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Ah, I checked the code.
>>>>
>>>> The BinaryInputFormat expects metadata which is written be the
>>>> BinaryOutputFormat.
>>>> So you cannot use the BinaryInputFormat to read a file which does not
>>>> provide the metadata.
>>>>
>>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> The file containing the serialized object is 7 bytes
>>>>>
>>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> This might be an issue with the blockSize parameter of the
>>>>>> BinaryInputFormat.
>>>>>> How large is the file with the single object?
>>>>>>
>>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> I also tried with
>>>>>>>
>>>>>>> DataSet<RowBundle> ds =
>>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>>
>>>>>>> but I get the same error :(
>>>>>>>
>>>>>>> Moreover, in this example I put exactly one object per file so it
>>>>>>> should be able to deserialize it, right?
>>>>>>>
>>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If you create your file by just sequentially writing all objects to
>>>>>>>> the file using Kryo, you can only read it with a parallelism of 1.
>>>>>>>> Writing binary files in a way that they can be read in parallel is
>>>>>>>> a bit tricky (and not specific to Flink).
>>>>>>>>
>>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>>
>>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>
>>>>>>>>> -----------------------------------------------
>>>>>>>>> My program is basically the following:
>>>>>>>>>
>>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>>
>>>>>>>>> ...
>>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>>> try (Output output = new Output(new
>>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>>> //serialise object
>>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> //deserialise object
>>>>>>>>>
>>>>>>>>> myObj=null;
>>>>>>>>>
>>>>>>>>> try (Input input = new Input( new
>>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> final ExecutionEnvironment env =
>>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>>> MyClassSerializer.class);
>>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>>> 64*1024*1024);
>>>>>>>>>
>>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>>
>>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>>> ds.print();
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>>> Serializer<MyClass> {
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Am I doing something wrong?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Flavio Pompermaier <po...@okkam.it>.
Should this be the case just reading recursively an entire directory
containing one object per file?

On Fri, Aug 7, 2015 at 12:04 PM, Fabian Hueske <fh...@gmail.com> wrote:

> You could implement your own InputFormat based on FileInputFormat and
> overwrite the createInputSplits method to just create a single split per
> file.
>
> 2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> So what should I do?
>>
>> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Ah, I checked the code.
>>>
>>> The BinaryInputFormat expects metadata which is written be the
>>> BinaryOutputFormat.
>>> So you cannot use the BinaryInputFormat to read a file which does not
>>> provide the metadata.
>>>
>>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> The file containing the serialized object is 7 bytes
>>>>
>>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> This might be an issue with the blockSize parameter of the
>>>>> BinaryInputFormat.
>>>>> How large is the file with the single object?
>>>>>
>>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> I also tried with
>>>>>>
>>>>>> DataSet<RowBundle> ds =
>>>>>> env.createInput(inputFormat).setParallelism(1);
>>>>>>
>>>>>> but I get the same error :(
>>>>>>
>>>>>> Moreover, in this example I put exactly one object per file so it
>>>>>> should be able to deserialize it, right?
>>>>>>
>>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> If you create your file by just sequentially writing all objects to
>>>>>>> the file using Kryo, you can only read it with a parallelism of 1.
>>>>>>> Writing binary files in a way that they can be read in parallel is a
>>>>>>> bit tricky (and not specific to Flink).
>>>>>>>
>>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <po...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>>
>>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>>> at
>>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>>> at
>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>>> at
>>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>
>>>>>>>> -----------------------------------------------
>>>>>>>> My program is basically the following:
>>>>>>>>
>>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>>
>>>>>>>> ...
>>>>>>>> //try-with-resources used to autoclose resources
>>>>>>>> try (Output output = new Output(new
>>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>>> //serialise object
>>>>>>>> Kryo kryo=new Kryo();
>>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>> }
>>>>>>>>
>>>>>>>> //deserialise object
>>>>>>>>
>>>>>>>> myObj=null;
>>>>>>>>
>>>>>>>> try (Input input = new Input( new
>>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> final ExecutionEnvironment env =
>>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>>> MyClassSerializer.class);
>>>>>>>> Configuration configuration = new Configuration();
>>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>>> 64*1024*1024);
>>>>>>>>
>>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>>> inputFormat.configure(configuration);
>>>>>>>>
>>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>>> ds.print();
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> private static final class MyClassSerializer extends
>>>>>>>> Serializer<MyClass> {
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> Am I doing something wrong?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
You could implement your own InputFormat based on FileInputFormat and
overwrite the createInputSplits method to just create a single split per
file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> So what should I do?
>
> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Ah, I checked the code.
>>
>> The BinaryInputFormat expects metadata which is written be the
>> BinaryOutputFormat.
>> So you cannot use the BinaryInputFormat to read a file which does not
>> provide the metadata.
>>
>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> The file containing the serialized object is 7 bytes
>>>
>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> This might be an issue with the blockSize parameter of the
>>>> BinaryInputFormat.
>>>> How large is the file with the single object?
>>>>
>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> I also tried with
>>>>>
>>>>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>>>>>
>>>>> but I get the same error :(
>>>>>
>>>>> Moreover, in this example I put exactly one object per file so it
>>>>> should be able to deserialize it, right?
>>>>>
>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> If you create your file by just sequentially writing all objects to
>>>>>> the file using Kryo, you can only read it with a parallelism of 1.
>>>>>> Writing binary files in a way that they can be read in parallel is a
>>>>>> bit tricky (and not specific to Flink).
>>>>>>
>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> Hi to all,
>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>
>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>> at
>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> -----------------------------------------------
>>>>>>> My program is basically the following:
>>>>>>>
>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>
>>>>>>> ...
>>>>>>> //try-with-resources used to autoclose resources
>>>>>>> try (Output output = new Output(new
>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>> //serialise object
>>>>>>> Kryo kryo=new Kryo();
>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>> }
>>>>>>>
>>>>>>> //deserialise object
>>>>>>>
>>>>>>> myObj=null;
>>>>>>>
>>>>>>> try (Input input = new Input( new
>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> final ExecutionEnvironment env =
>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>> MyClassSerializer.class);
>>>>>>> Configuration configuration = new Configuration();
>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>> 64*1024*1024);
>>>>>>>
>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>> inputFormat.configure(configuration);
>>>>>>>
>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>> ds.print();
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> private static final class MyClassSerializer extends
>>>>>>> Serializer<MyClass> {
>>>>>>>
>>>>>>> @Override
>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> Am I doing something wrong?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Flavio Pompermaier <po...@okkam.it>.
So what should I do?

On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Ah, I checked the code.
>
> The BinaryInputFormat expects metadata which is written be the
> BinaryOutputFormat.
> So you cannot use the BinaryInputFormat to read a file which does not
> provide the metadata.
>
> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> The file containing the serialized object is 7 bytes
>>
>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> This might be an issue with the blockSize parameter of the
>>> BinaryInputFormat.
>>> How large is the file with the single object?
>>>
>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> I also tried with
>>>>
>>>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>>>>
>>>> but I get the same error :(
>>>>
>>>> Moreover, in this example I put exactly one object per file so it
>>>> should be able to deserialize it, right?
>>>>
>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> If you create your file by just sequentially writing all objects to
>>>>> the file using Kryo, you can only read it with a parallelism of 1.
>>>>> Writing binary files in a way that they can be read in parallel is a
>>>>> bit tricky (and not specific to Flink).
>>>>>
>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> Hi to all,
>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>>> inputsplits, where just one is not empty..).
>>>>>>
>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>> at
>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>> at
>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>> at
>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> -----------------------------------------------
>>>>>> My program is basically the following:
>>>>>>
>>>>>> public static void main(String[] args) throws Exception {
>>>>>>
>>>>>> ...
>>>>>> //try-with-resources used to autoclose resources
>>>>>> try (Output output = new Output(new
>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>> //serialise object
>>>>>> Kryo kryo=new Kryo();
>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>> } catch (FileNotFoundException ex) {
>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>> }
>>>>>>
>>>>>> //deserialise object
>>>>>>
>>>>>> myObj=null;
>>>>>>
>>>>>> try (Input input = new Input( new
>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>     Kryo kryo=new Kryo();
>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>> } catch (FileNotFoundException ex) {
>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>> }
>>>>>>
>>>>>>
>>>>>> final ExecutionEnvironment env =
>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>> MyClassSerializer.class);
>>>>>> Configuration configuration = new Configuration();
>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>> 64*1024*1024);
>>>>>>
>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>> inputFormat.configure(configuration);
>>>>>>
>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>> ds.print();
>>>>>>
>>>>>> }
>>>>>>
>>>>>> private static final class MyClassSerializer extends
>>>>>> Serializer<MyClass> {
>>>>>>
>>>>>> @Override
>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>> kryo.writeClassAndObject(output, object);
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> Am I doing something wrong?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the
BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not
provide the metadata.

2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> The file containing the serialized object is 7 bytes
>
> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> This might be an issue with the blockSize parameter of the
>> BinaryInputFormat.
>> How large is the file with the single object?
>>
>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> I also tried with
>>>
>>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>>>
>>> but I get the same error :(
>>>
>>> Moreover, in this example I put exactly one object per file so it should
>>> be able to deserialize it, right?
>>>
>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> If you create your file by just sequentially writing all objects to the
>>>> file using Kryo, you can only read it with a parallelism of 1.
>>>> Writing binary files in a way that they can be read in parallel is a
>>>> bit tricky (and not specific to Flink).
>>>>
>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Hi to all,
>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>> exception (due to the fact that the createInputSplits creates 8
>>>>> inputsplits, where just one is not empty..).
>>>>>
>>>>> Caused by: java.io.IOException: Invalid argument
>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>> at
>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>> at
>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>> at
>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>> at
>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> -----------------------------------------------
>>>>> My program is basically the following:
>>>>>
>>>>> public static void main(String[] args) throws Exception {
>>>>>
>>>>> ...
>>>>> //try-with-resources used to autoclose resources
>>>>> try (Output output = new Output(new
>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>> //serialise object
>>>>> Kryo kryo=new Kryo();
>>>>> kryo.writeClassAndObject(output, myObj);
>>>>> } catch (FileNotFoundException ex) {
>>>>> LOG.error(ex.getMessage(), ex);
>>>>> }
>>>>>
>>>>> //deserialise object
>>>>>
>>>>> myObj=null;
>>>>>
>>>>> try (Input input = new Input( new
>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>     Kryo kryo=new Kryo();
>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>> } catch (FileNotFoundException ex) {
>>>>> LOG.error(ex.getMessage(), ex);
>>>>> }
>>>>>
>>>>>
>>>>> final ExecutionEnvironment env =
>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>> MyClassSerializer.class);
>>>>> Configuration configuration = new Configuration();
>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>> 64*1024*1024);
>>>>>
>>>>> TypeInformation<MyClass> typeInfo = new
>>>>> GenericTypeInfo<>(MyClass.class);
>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>> inputFormat.configure(configuration);
>>>>>
>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>> ds.print();
>>>>>
>>>>> }
>>>>>
>>>>> private static final class MyClassSerializer extends
>>>>> Serializer<MyClass> {
>>>>>
>>>>> @Override
>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>> kryo.writeClassAndObject(output, object);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>> }
>>>>> }
>>>>>
>>>>> Am I doing something wrong?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Flavio Pompermaier <po...@okkam.it>.
The file containing the serialized object is 7 bytes

On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fh...@gmail.com> wrote:

> This might be an issue with the blockSize parameter of the
> BinaryInputFormat.
> How large is the file with the single object?
>
> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> I also tried with
>>
>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>>
>> but I get the same error :(
>>
>> Moreover, in this example I put exactly one object per file so it should
>> be able to deserialize it, right?
>>
>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> If you create your file by just sequentially writing all objects to the
>>> file using Kryo, you can only read it with a parallelism of 1.
>>> Writing binary files in a way that they can be read in parallel is a bit
>>> tricky (and not specific to Flink).
>>>
>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Hi to all,
>>>> I;m trying to read a file serialized with kryo but I get this exception
>>>> (due to the fact that the createInputSplits creates 8 inputsplits, where
>>>> just one is not empty..).
>>>>
>>>> Caused by: java.io.IOException: Invalid argument
>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>> at
>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>> at
>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>> at
>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> -----------------------------------------------
>>>> My program is basically the following:
>>>>
>>>> public static void main(String[] args) throws Exception {
>>>>
>>>> ...
>>>> //try-with-resources used to autoclose resources
>>>> try (Output output = new Output(new
>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>> //serialise object
>>>> Kryo kryo=new Kryo();
>>>> kryo.writeClassAndObject(output, myObj);
>>>> } catch (FileNotFoundException ex) {
>>>> LOG.error(ex.getMessage(), ex);
>>>> }
>>>>
>>>> //deserialise object
>>>>
>>>> myObj=null;
>>>>
>>>> try (Input input = new Input( new
>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>     Kryo kryo=new Kryo();
>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>> } catch (FileNotFoundException ex) {
>>>> LOG.error(ex.getMessage(), ex);
>>>> }
>>>>
>>>>
>>>> final ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>> MyClassSerializer.class);
>>>> Configuration configuration = new Configuration();
>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>> 64*1024*1024);
>>>>
>>>> TypeInformation<MyClass> typeInfo = new
>>>> GenericTypeInfo<>(MyClass.class);
>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>> TypeSerializerInputFormat<>(typeInfo);
>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>> inputFormat.configure(configuration);
>>>>
>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>> ds.print();
>>>>
>>>> }
>>>>
>>>> private static final class MyClassSerializer extends
>>>> Serializer<MyClass> {
>>>>
>>>> @Override
>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>> kryo.writeClassAndObject(output, object);
>>>> }
>>>>
>>>> @Override
>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>>> return (MyClass) kryo.readClassAndObject(input);
>>>> }
>>>> }
>>>>
>>>> Am I doing something wrong?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>>>
>>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
This might be an issue with the blockSize parameter of the
BinaryInputFormat.
How large is the file with the single object?

2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> I also tried with
>
> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>
> but I get the same error :(
>
> Moreover, in this example I put exactly one object per file so it should
> be able to deserialize it, right?
>
> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> If you create your file by just sequentially writing all objects to the
>> file using Kryo, you can only read it with a parallelism of 1.
>> Writing binary files in a way that they can be read in parallel is a bit
>> tricky (and not specific to Flink).
>>
>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Hi to all,
>>> I;m trying to read a file serialized with kryo but I get this exception
>>> (due to the fact that the createInputSplits creates 8 inputsplits, where
>>> just one is not empty..).
>>>
>>> Caused by: java.io.IOException: Invalid argument
>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>> at
>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>> at
>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>> at
>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> -----------------------------------------------
>>> My program is basically the following:
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> ...
>>> //try-with-resources used to autoclose resources
>>> try (Output output = new Output(new
>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>> //serialise object
>>> Kryo kryo=new Kryo();
>>> kryo.writeClassAndObject(output, myObj);
>>> } catch (FileNotFoundException ex) {
>>> LOG.error(ex.getMessage(), ex);
>>> }
>>>
>>> //deserialise object
>>>
>>> myObj=null;
>>>
>>> try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
>>>     Kryo kryo=new Kryo();
>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>> } catch (FileNotFoundException ex) {
>>> LOG.error(ex.getMessage(), ex);
>>> }
>>>
>>>
>>> final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>> MyClassSerializer.class);
>>> Configuration configuration = new Configuration();
>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>> 64*1024*1024);
>>>
>>> TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
>>> final BinaryInputFormat<MyClass> inputFormat = new
>>> TypeSerializerInputFormat<>(typeInfo);
>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>> inputFormat.configure(configuration);
>>>
>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>> ds.print();
>>>
>>> }
>>>
>>> private static final class MyClassSerializer extends Serializer<MyClass>
>>> {
>>>
>>> @Override
>>> public void write(Kryo kryo, Output output, MyClass object) {
>>> kryo.writeClassAndObject(output, object);
>>> }
>>>
>>> @Override
>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>>> return (MyClass) kryo.readClassAndObject(input);
>>> }
>>> }
>>>
>>> Am I doing something wrong?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Flavio Pompermaier <po...@okkam.it>.
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be
able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fh...@gmail.com> wrote:

> If you create your file by just sequentially writing all objects to the
> file using Kryo, you can only read it with a parallelism of 1.
> Writing binary files in a way that they can be read in parallel is a bit
> tricky (and not specific to Flink).
>
> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Hi to all,
>> I;m trying to read a file serialized with kryo but I get this exception
>> (due to the fact that the createInputSplits creates 8 inputsplits, where
>> just one is not empty..).
>>
>> Caused by: java.io.IOException: Invalid argument
>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>> at
>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>> at
>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>> at
>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> -----------------------------------------------
>> My program is basically the following:
>>
>> public static void main(String[] args) throws Exception {
>>
>> ...
>> //try-with-resources used to autoclose resources
>> try (Output output = new Output(new
>> FileOutputStream("/tmp/KryoTest.ser"))) {
>> //serialise object
>> Kryo kryo=new Kryo();
>> kryo.writeClassAndObject(output, myObj);
>> } catch (FileNotFoundException ex) {
>> LOG.error(ex.getMessage(), ex);
>> }
>>
>> //deserialise object
>>
>> myObj=null;
>>
>> try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
>>     Kryo kryo=new Kryo();
>>     myObj =(MyClass)kryo.readClassAndObject(input);
>> } catch (FileNotFoundException ex) {
>> LOG.error(ex.getMessage(), ex);
>> }
>>
>>
>> final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> env.registerTypeWithKryoSerializer(MyClass.class,
>> MyClassSerializer.class);
>> Configuration configuration = new Configuration();
>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>> 64*1024*1024);
>>
>> TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
>> final BinaryInputFormat<MyClass> inputFormat = new
>> TypeSerializerInputFormat<>(typeInfo);
>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>> inputFormat.configure(configuration);
>>
>> DataSet<MyClass> ds = env.createInput(inputFormat);
>> ds.print();
>>
>> }
>>
>> private static final class MyClassSerializer extends Serializer<MyClass> {
>>
>> @Override
>> public void write(Kryo kryo, Output output, MyClass object) {
>> kryo.writeClassAndObject(output, object);
>> }
>>
>> @Override
>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>> return (MyClass) kryo.readClassAndObject(input);
>> }
>> }
>>
>> Am I doing something wrong?
>>
>> Best,
>> Flavio
>>
>>
>>
>

Re: Invalid argument reading a file containing a Kryo object

Posted by Fabian Hueske <fh...@gmail.com>.
If you create your file by just sequentially writing all objects to the
file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit
tricky (and not specific to Flink).

2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Hi to all,
> I;m trying to read a file serialized with kryo but I get this exception
> (due to the fact that the createInputSplits creates 8 inputsplits, where
> just one is not empty..).
>
> Caused by: java.io.IOException: Invalid argument
> at sun.nio.ch.FileChannelImpl.position0(Native Method)
> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
> at
> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
> at
> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
> at
> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> -----------------------------------------------
> My program is basically the following:
>
> public static void main(String[] args) throws Exception {
>
> ...
> //try-with-resources used to autoclose resources
> try (Output output = new Output(new
> FileOutputStream("/tmp/KryoTest.ser"))) {
> //serialise object
> Kryo kryo=new Kryo();
> kryo.writeClassAndObject(output, myObj);
> } catch (FileNotFoundException ex) {
> LOG.error(ex.getMessage(), ex);
> }
>
> //deserialise object
>
> myObj=null;
>
> try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
>     Kryo kryo=new Kryo();
>     myObj =(MyClass)kryo.readClassAndObject(input);
> } catch (FileNotFoundException ex) {
> LOG.error(ex.getMessage(), ex);
> }
>
>
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
> Configuration configuration = new Configuration();
> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
> 64*1024*1024);
>
> TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
> final BinaryInputFormat<MyClass> inputFormat = new
> TypeSerializerInputFormat<>(typeInfo);
> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
> inputFormat.configure(configuration);
>
> DataSet<MyClass> ds = env.createInput(inputFormat);
> ds.print();
>
> }
>
> private static final class MyClassSerializer extends Serializer<MyClass> {
>
> @Override
> public void write(Kryo kryo, Output output, MyClass object) {
> kryo.writeClassAndObject(output, object);
> }
>
> @Override
> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
> return (MyClass) kryo.readClassAndObject(input);
> }
> }
>
> Am I doing something wrong?
>
> Best,
> Flavio
>
>
>