You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Thomas Li Fredriksen <th...@cognite.com> on 2020/11/20 06:07:23 UTC

FileIO Azure Storage problems

Good morning everyone.


I am attempting to parse a very large CSV (65 million lines) with BEAM 
(version 2.25) from an Azure Blob and have created a pipeline for this. 
I am running the pipeline on dataflow and testing with a smaller version 
of the file (10'000 lines).

I am using FileIO and the filesystem prefix "azfs" to read from azure 
blobs.

The pipeline works with the small test file, but when I run this on the 
bigger file I am getting an exception "Stream Mark Expired" (pasted 
below). Reading the same file from a GCP bucket works just fine, 
including when running with dataflow.

Our primary filestore is Azure Storage, so moving to GCP buckets is not 
an option.

Is there anyone who can help me resolve this?


Best Regards
Thomas Li Fredriksen


The exception referenced above:


Error message from worker: java.lang.RuntimeException: 
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: 
Stream mark expired.
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184) 

org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108) 

org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56) 

org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39) 

org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121) 

org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) 

org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114) 

org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) 

org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) 

org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) 

org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) 

org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 

org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:417) 

org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:386) 

org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) 

org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) 

org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) 

org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) 

java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 

java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 

     java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.RuntimeException: Stream mark expired.
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown 
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 

org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) 

org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) 

org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) 

org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 

org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73) 

org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) 

org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown 
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 

org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) 

org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) 

org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) 

org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) 

org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86) 

org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown 
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) 

org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) 

org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335) 

org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) 

org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) 

org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182) 

     ... 21 more
Caused by: java.lang.RuntimeException: Stream mark expired.
com.azure.storage.common.StorageInputStream.reset(StorageInputStream.java:366) 

org.apache.beam.sdk.io.azure.blobstore.AzureReadableSeekableByteChannel.position(AzureReadableSeekableByteChannel.java:89) 

org.apache.beam.sdk.io.TextSource$TextBasedReader.startReading(TextSource.java:152) 

org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476) 

org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) 

org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn.process(ReadAllViaFileBasedSource.java:113) 



Re: FileIO Azure Storage problems

Posted by "Thomas Fredriksen(External)" <th...@cognite.com>.
Hi Pablo.

I will check out the PR and perform a test with the large CSV we talked
about tonight or tomorrow.

Fingers crossed :)


On Wed, Dec 2, 2020, 16:05 Pablo Estrada <pa...@google.com> wrote:

> Hi Thomas!
> This is my guess for a fix for this issue:
> https://github.com/apache/beam/pull/13441
> I have not tested it, so I am not sure that it will work. I wonder if
> you'd be willing to try it? (perhaps I can build a JAR for it... or... I'm
> not sure, what do you think?)
> Best
> -P.
>
> On Fri, Nov 20, 2020 at 11:20 AM Thomas Fredriksen(External) <
> thomas.fredriksen@cognite.com> wrote:
>
>> Hi Pablo!
>>
>> The file I am trying to ingest is 54.2 GB unzipped. It is available here:
>> https://obis.org/manual/access/
>>
>> I created a Jira issue for you, but seem unable to assign it:
>> https://issues.apache.org/jira/browse/BEAM-11313
>>
>> Thank you so much for taking a look at this!
>>
>> Best Regards
>> Thomas Li Fredriksen
>>
>> On Fri, Nov 20, 2020 at 5:43 PM Pablo Estrada <pa...@google.com> wrote:
>>
>>> Follow up: What is the size of the file you're consuming?
>>> -P.
>>>
>>> On Fri, Nov 20, 2020 at 8:40 AM Pablo Estrada <pa...@google.com>
>>> wrote:
>>>
>>>> Hi Thomas!
>>>> This looks like it may be a bug with the azfs implementation. If you
>>>> notice the code, you're hitting this issue when the byte channel needs to
>>>> seek backwards.
>>>> I may take a stab at fixing it. I believe we have to catch a mark
>>>> expiration, and just reopen the file if that happens.
>>>>
>>>> Do you think you could create a JIRA issue to track it and assign it to
>>>> me?
>>>>
>>>> On Thu, Nov 19, 2020 at 10:07 PM Thomas Li Fredriksen <
>>>> thomas.fredriksen@cognite.com> wrote:
>>>>
>>>>> Good morning everyone.
>>>>>
>>>>>
>>>>> I am attempting to parse a very large CSV (65 million lines) with BEAM
>>>>> (version 2.25) from an Azure Blob and have created a pipeline for this. I
>>>>> am running the pipeline on dataflow and testing with a smaller version of
>>>>> the file (10'000 lines).
>>>>>
>>>>> I am using FileIO and the filesystem prefix "azfs" to read from azure
>>>>> blobs.
>>>>>
>>>>> The pipeline works with the small test file, but when I run this on
>>>>> the bigger file I am getting an exception "Stream Mark Expired" (pasted
>>>>> below). Reading the same file from a GCP bucket works just fine, including
>>>>> when running with dataflow.
>>>>>
>>>>> Our primary filestore is Azure Storage, so moving to GCP buckets is
>>>>> not an option.
>>>>>
>>>>> Is there anyone who can help me resolve this?
>>>>>
>>>>>
>>>>> Best Regards
>>>>> Thomas Li Fredriksen
>>>>>
>>>>>
>>>>> The exception referenced above:
>>>>>
>>>>>
>>>>> Error message from worker: java.lang.RuntimeException:
>>>>> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
>>>>> Stream mark expired.
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:417)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:386)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>>>>>
>>>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>>>
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>>
>>>>>     java.base/java.lang.Thread.run(Thread.java:834)
>>>>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>>>>> java.lang.RuntimeException: Stream mark expired.
>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>>
>>>>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown
>>>>> Source)
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>>
>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>>>>
>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>>>>
>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>> Source)
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>>>>
>>>>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
>>>>>
>>>>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown
>>>>> Source)
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>>>>>
>>>>>     ... 21 more
>>>>> Caused by: java.lang.RuntimeException: Stream mark expired.
>>>>> com.azure.storage.common.StorageInputStream.reset(StorageInputStream.java:366)
>>>>>
>>>>> org.apache.beam.sdk.io.azure.blobstore.AzureReadableSeekableByteChannel.position(AzureReadableSeekableByteChannel.java:89)
>>>>>
>>>>> org.apache.beam.sdk.io.TextSource$TextBasedReader.startReading(TextSource.java:152)
>>>>>
>>>>> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
>>>>>
>>>>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
>>>>>
>>>>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn.process(ReadAllViaFileBasedSource.java:113)
>>>>>
>>>>>
>>>>>

Re: FileIO Azure Storage problems

Posted by Pablo Estrada <pa...@google.com>.
Hi Thomas!
This is my guess for a fix for this issue:
https://github.com/apache/beam/pull/13441
I have not tested it, so I am not sure that it will work. I wonder if you'd
be willing to try it? (perhaps I can build a JAR for it... or... I'm not
sure, what do you think?)
Best
-P.

On Fri, Nov 20, 2020 at 11:20 AM Thomas Fredriksen(External) <
thomas.fredriksen@cognite.com> wrote:

> Hi Pablo!
>
> The file I am trying to ingest is 54.2 GB unzipped. It is available here:
> https://obis.org/manual/access/
>
> I created a Jira issue for you, but seem unable to assign it:
> https://issues.apache.org/jira/browse/BEAM-11313
>
> Thank you so much for taking a look at this!
>
> Best Regards
> Thomas Li Fredriksen
>
> On Fri, Nov 20, 2020 at 5:43 PM Pablo Estrada <pa...@google.com> wrote:
>
>> Follow up: What is the size of the file you're consuming?
>> -P.
>>
>> On Fri, Nov 20, 2020 at 8:40 AM Pablo Estrada <pa...@google.com> wrote:
>>
>>> Hi Thomas!
>>> This looks like it may be a bug with the azfs implementation. If you
>>> notice the code, you're hitting this issue when the byte channel needs to
>>> seek backwards.
>>> I may take a stab at fixing it. I believe we have to catch a mark
>>> expiration, and just reopen the file if that happens.
>>>
>>> Do you think you could create a JIRA issue to track it and assign it to
>>> me?
>>>
>>> On Thu, Nov 19, 2020 at 10:07 PM Thomas Li Fredriksen <
>>> thomas.fredriksen@cognite.com> wrote:
>>>
>>>> Good morning everyone.
>>>>
>>>>
>>>> I am attempting to parse a very large CSV (65 million lines) with BEAM
>>>> (version 2.25) from an Azure Blob and have created a pipeline for this. I
>>>> am running the pipeline on dataflow and testing with a smaller version of
>>>> the file (10'000 lines).
>>>>
>>>> I am using FileIO and the filesystem prefix "azfs" to read from azure
>>>> blobs.
>>>>
>>>> The pipeline works with the small test file, but when I run this on the
>>>> bigger file I am getting an exception "Stream Mark Expired" (pasted below).
>>>> Reading the same file from a GCP bucket works just fine, including when
>>>> running with dataflow.
>>>>
>>>> Our primary filestore is Azure Storage, so moving to GCP buckets is not
>>>> an option.
>>>>
>>>> Is there anyone who can help me resolve this?
>>>>
>>>>
>>>> Best Regards
>>>> Thomas Li Fredriksen
>>>>
>>>>
>>>> The exception referenced above:
>>>>
>>>>
>>>> Error message from worker: java.lang.RuntimeException:
>>>> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
>>>> Stream mark expired.
>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:417)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:386)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>>>>
>>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>>
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>
>>>>     java.base/java.lang.Thread.run(Thread.java:834)
>>>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>>>> java.lang.RuntimeException: Stream mark expired.
>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>>
>>>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>
>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>>>
>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>>>
>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>>>
>>>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
>>>>
>>>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>>>>
>>>>     ... 21 more
>>>> Caused by: java.lang.RuntimeException: Stream mark expired.
>>>> com.azure.storage.common.StorageInputStream.reset(StorageInputStream.java:366)
>>>>
>>>> org.apache.beam.sdk.io.azure.blobstore.AzureReadableSeekableByteChannel.position(AzureReadableSeekableByteChannel.java:89)
>>>>
>>>> org.apache.beam.sdk.io.TextSource$TextBasedReader.startReading(TextSource.java:152)
>>>>
>>>> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
>>>>
>>>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
>>>>
>>>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn.process(ReadAllViaFileBasedSource.java:113)
>>>>
>>>>
>>>>

Re: FileIO Azure Storage problems

Posted by "Thomas Fredriksen(External)" <th...@cognite.com>.
Hi Pablo!

The file I am trying to ingest is 54.2 GB unzipped. It is available here:
https://obis.org/manual/access/

I created a Jira issue for you, but seem unable to assign it:
https://issues.apache.org/jira/browse/BEAM-11313

Thank you so much for taking a look at this!

Best Regards
Thomas Li Fredriksen

On Fri, Nov 20, 2020 at 5:43 PM Pablo Estrada <pa...@google.com> wrote:

> Follow up: What is the size of the file you're consuming?
> -P.
>
> On Fri, Nov 20, 2020 at 8:40 AM Pablo Estrada <pa...@google.com> wrote:
>
>> Hi Thomas!
>> This looks like it may be a bug with the azfs implementation. If you
>> notice the code, you're hitting this issue when the byte channel needs to
>> seek backwards.
>> I may take a stab at fixing it. I believe we have to catch a mark
>> expiration, and just reopen the file if that happens.
>>
>> Do you think you could create a JIRA issue to track it and assign it to
>> me?
>>
>> On Thu, Nov 19, 2020 at 10:07 PM Thomas Li Fredriksen <
>> thomas.fredriksen@cognite.com> wrote:
>>
>>> Good morning everyone.
>>>
>>>
>>> I am attempting to parse a very large CSV (65 million lines) with BEAM
>>> (version 2.25) from an Azure Blob and have created a pipeline for this. I
>>> am running the pipeline on dataflow and testing with a smaller version of
>>> the file (10'000 lines).
>>>
>>> I am using FileIO and the filesystem prefix "azfs" to read from azure
>>> blobs.
>>>
>>> The pipeline works with the small test file, but when I run this on the
>>> bigger file I am getting an exception "Stream Mark Expired" (pasted below).
>>> Reading the same file from a GCP bucket works just fine, including when
>>> running with dataflow.
>>>
>>> Our primary filestore is Azure Storage, so moving to GCP buckets is not
>>> an option.
>>>
>>> Is there anyone who can help me resolve this?
>>>
>>>
>>> Best Regards
>>> Thomas Li Fredriksen
>>>
>>>
>>> The exception referenced above:
>>>
>>>
>>> Error message from worker: java.lang.RuntimeException:
>>> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
>>> Stream mark expired.
>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>>>
>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
>>>
>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>>>
>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>>>
>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>>
>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:417)
>>>
>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:386)
>>>
>>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
>>>
>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>>
>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>>
>>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>>>
>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>
>>>     java.base/java.lang.Thread.run(Thread.java:834)
>>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>>> java.lang.RuntimeException: Stream mark expired.
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>>
>>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>>
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>>
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>>
>>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
>>>
>>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>>
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>>
>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>
>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>
>>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>>>
>>>     ... 21 more
>>> Caused by: java.lang.RuntimeException: Stream mark expired.
>>> com.azure.storage.common.StorageInputStream.reset(StorageInputStream.java:366)
>>>
>>> org.apache.beam.sdk.io.azure.blobstore.AzureReadableSeekableByteChannel.position(AzureReadableSeekableByteChannel.java:89)
>>>
>>> org.apache.beam.sdk.io.TextSource$TextBasedReader.startReading(TextSource.java:152)
>>>
>>> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
>>>
>>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
>>>
>>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn.process(ReadAllViaFileBasedSource.java:113)
>>>
>>>
>>>

Re: FileIO Azure Storage problems

Posted by Pablo Estrada <pa...@google.com>.
Follow up: What is the size of the file you're consuming?
-P.

On Fri, Nov 20, 2020 at 8:40 AM Pablo Estrada <pa...@google.com> wrote:

> Hi Thomas!
> This looks like it may be a bug with the azfs implementation. If you
> notice the code, you're hitting this issue when the byte channel needs to
> seek backwards.
> I may take a stab at fixing it. I believe we have to catch a mark
> expiration, and just reopen the file if that happens.
>
> Do you think you could create a JIRA issue to track it and assign it to me?
>
> On Thu, Nov 19, 2020 at 10:07 PM Thomas Li Fredriksen <
> thomas.fredriksen@cognite.com> wrote:
>
>> Good morning everyone.
>>
>>
>> I am attempting to parse a very large CSV (65 million lines) with BEAM
>> (version 2.25) from an Azure Blob and have created a pipeline for this. I
>> am running the pipeline on dataflow and testing with a smaller version of
>> the file (10'000 lines).
>>
>> I am using FileIO and the filesystem prefix "azfs" to read from azure
>> blobs.
>>
>> The pipeline works with the small test file, but when I run this on the
>> bigger file I am getting an exception "Stream Mark Expired" (pasted below).
>> Reading the same file from a GCP bucket works just fine, including when
>> running with dataflow.
>>
>> Our primary filestore is Azure Storage, so moving to GCP buckets is not
>> an option.
>>
>> Is there anyone who can help me resolve this?
>>
>>
>> Best Regards
>> Thomas Li Fredriksen
>>
>>
>> The exception referenced above:
>>
>>
>> Error message from worker: java.lang.RuntimeException:
>> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
>> Stream mark expired.
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>>
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
>>
>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
>>
>> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
>>
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>>
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>>
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>
>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:417)
>>
>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:386)
>>
>> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>>
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>
>>     java.base/java.lang.Thread.run(Thread.java:834)
>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>> java.lang.RuntimeException: Stream mark expired.
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>>
>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>>
>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>>
>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>>
>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
>>
>> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>>
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>>
>>     ... 21 more
>> Caused by: java.lang.RuntimeException: Stream mark expired.
>> com.azure.storage.common.StorageInputStream.reset(StorageInputStream.java:366)
>>
>> org.apache.beam.sdk.io.azure.blobstore.AzureReadableSeekableByteChannel.position(AzureReadableSeekableByteChannel.java:89)
>>
>> org.apache.beam.sdk.io.TextSource$TextBasedReader.startReading(TextSource.java:152)
>>
>> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
>>
>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
>>
>> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn.process(ReadAllViaFileBasedSource.java:113)
>>
>>
>>

Re: FileIO Azure Storage problems

Posted by Pablo Estrada <pa...@google.com>.
Hi Thomas!
This looks like it may be a bug with the azfs implementation. If you notice
the code, you're hitting this issue when the byte channel needs to seek
backwards.
I may take a stab at fixing it. I believe we have to catch a mark
expiration, and just reopen the file if that happens.

Do you think you could create a JIRA issue to track it and assign it to me?

On Thu, Nov 19, 2020 at 10:07 PM Thomas Li Fredriksen <
thomas.fredriksen@cognite.com> wrote:

> Good morning everyone.
>
>
> I am attempting to parse a very large CSV (65 million lines) with BEAM
> (version 2.25) from an Azure Blob and have created a pipeline for this. I
> am running the pipeline on dataflow and testing with a smaller version of
> the file (10'000 lines).
>
> I am using FileIO and the filesystem prefix "azfs" to read from azure
> blobs.
>
> The pipeline works with the small test file, but when I run this on the
> bigger file I am getting an exception "Stream Mark Expired" (pasted below).
> Reading the same file from a GCP bucket works just fine, including when
> running with dataflow.
>
> Our primary filestore is Azure Storage, so moving to GCP buckets is not an
> option.
>
> Is there anyone who can help me resolve this?
>
>
> Best Regards
> Thomas Li Fredriksen
>
>
> The exception referenced above:
>
>
> Error message from worker: java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
> Stream mark expired.
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
>
> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
>
> org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:114)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:417)
>
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:386)
>
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
>
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
>
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
>
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
>
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
>     java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException: Stream mark expired.
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
>
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
>
> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:280)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>
> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
>
> org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:335)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>
>     ... 21 more
> Caused by: java.lang.RuntimeException: Stream mark expired.
> com.azure.storage.common.StorageInputStream.reset(StorageInputStream.java:366)
>
> org.apache.beam.sdk.io.azure.blobstore.AzureReadableSeekableByteChannel.position(AzureReadableSeekableByteChannel.java:89)
>
> org.apache.beam.sdk.io.TextSource$TextBasedReader.startReading(TextSource.java:152)
>
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
>
> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249)
>
> org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn.process(ReadAllViaFileBasedSource.java:113)
>
>
>