You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2020/06/25 08:26:43 UTC

Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Hello All,

I am using the BEAM java 2.19.0 version on google dataflow.

Need urgent help in debugging one issue.

I recently added 3-4 new PTransformations. to an existing pipeline where I
read data from BQ for a certain timestamp and create
PCollectionView<Map<Key,value>> to be used as side input in other
PTransforms.

i.e. something like this:

/**
 * Get PCollectionView Stats1
 */
PCollectionView<Map<Stats1Key, Stats1>> stats1View =
    jobCompleteStatus
        .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
        .apply("View_S1STATS", View.asSingleton());

/**
 * Get PCollectionView of Stats2
 */
PCollectionView<Map<Stats2Key, Stats2>> stats2View =
    jobCompleteStatus
        .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
        .apply("View_S2STATS", View.asSingleton());


and a couple more like these PTransforms. Here jobCompleteStatus is a message

received from PubSub that act as a trigger to reload these views.

The moment I deployed the above pipeline, it didn't start and

error reporting gave weird exceptions(see attached screenshot1 and
screenshot) which I don't know how to debug.


Then as an experiment I made a change where I enabled only one new
transformation

and disabled others. This time I didn't see any issue.

So it looks like some memory issue.

I also compared worker logs between working case and non working case

and it looks resources were not granted in non working case.

(See attached working-workerlogs and nonworking-workerlogs)

I could't find any other log.


I would really appreciate quick help here.


Thanks and Regards

Mohil

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Mohil Khare <mo...@prosimo.io>.
I have opened following ticket:
https://issues.apache.org/jira/browse/BEAM-10339

Please let me know if there some other place where I need to open a support
ticket.

Thank you
Mohil



On Fri, Jun 26, 2020 at 11:13 AM Mohil Khare <mo...@prosimo.io> wrote:

> Sure not a problem. I will open one.
>
> Thanks
> Mohil
>
> On Fri, Jun 26, 2020 at 10:55 AM Luke Cwik <lc...@google.com> wrote:
>
>> Sorry, I didn't open a support case. You should open the support case.
>>
>> On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Thanks a lot Luke for following up on this and opening a dataflow
>>> support.  It would be good to know how streamingEngine solved the problem.
>>> I will really appreciate it if you can share a link for a support case
>>> once you open it (if it is possible).
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>>
>>>
>>> On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> It seems as though we have seen this failure before for Dataflow and it
>>>> was caused because the side input tags needed to be unique in a streaming
>>>> pipeline.
>>>>
>>>> It looked like this used to be a common occurrence in the Python SDK[1,
>>>> 2] because it generated tags that weren't unique enough.
>>>>
>>>> I would open up a case with Dataflow support with all the information
>>>> you have provided here.
>>>>
>>>> 1: https://issues.apache.org/jira/browse/BEAM-4549
>>>> 2: https://issues.apache.org/jira/browse/BEAM-4534
>>>>
>>>> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hi Luke and all,
>>>>>
>>>>> UPDATE: So when I started my job by *enabling the streaming engine
>>>>> and keeping the machine type default for the streaming engine
>>>>> (n1-standard-2)*, the pipeline started successfully.
>>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>>
>>>>> Still evaluating to make sure that there is no performance degradation
>>>>> by doing so.
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>>
>>>>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>
>>>>>> Hi Luke,
>>>>>>
>>>>>> Let me give you some more details about the code.
>>>>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>>>>
>>>>>> Default machine type which n1-standard-4.
>>>>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks
>>>>>> it up based on number of cores available)
>>>>>>
>>>>>> 1: Code listens for some trigger on pubsub topic:
>>>>>>         /**
>>>>>>
>>>>>>      * Read From PubSub for topic ANALYTICS_UPDATE and create PCollection<String> indicating main pipeline to reload     * relevant DataAnalyticsData from BQ table     */    static class MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {        private final String subscriptionName;        private final String jobProject;        MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String jobProject) {            this.subscriptionName = subscriptionName;            this.jobProject = jobProject;        }        @Override        public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {            return input.getPipeline()                .apply("Read_PubSub_Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))                .apply("Applying_Windowing", Window.<PubsubMessage>into(new GlobalWindows())                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                    .discardingFiredPanes())                .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, POJORepresentingJobCompleteInfo>() {                    @ProcessElement                    public void processElement(@Element PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo> out) {                        /*** Read and CReate ***/
>>>>>>
>>>>>>                         out.output(POJORepresentingJobCompleteInfo);                                           }                }));        }    }
>>>>>>
>>>>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using google cloud bigquery library (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>>>>>
>>>>>>     PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>>>>>>
>>>>>>             new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, jobProject));
>>>>>>
>>>>>>
>>>>>> 3. Create various PCollectionViews to be used as side input for decorating stream of logs coming from Kafka (will be shown later)
>>>>>>
>>>>>>    PCollectionView<Map<Stats1Key, Stats1>> Stats1View =
>>>>>>
>>>>>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats1())                 .apply("View_Stats1", View.asSingleton());
>>>>>>
>>>>>>
>>>>>>    PCollectionView<Map<Stats2Key, Stats2>> Stats2View =
>>>>>>
>>>>>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats2())                 .apply("View_Stats1", View.asSingleton());
>>>>>>
>>>>>>    .
>>>>>>
>>>>>>    .
>>>>>>
>>>>>>    .
>>>>>>
>>>>>>    . and so one
>>>>>>
>>>>>>
>>>>>> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), ReadStats2() and so on
>>>>>>
>>>>>>    class ReadStatsS1 extends PTransform<PCollection<POJORepresentingJobCompleteInfo>, PCollection<Map<Stats1Key, Stats1>>> {
>>>>>>
>>>>>>       @Override    public PCollection<Map<Stats1Key, Stats1>> expand(PCollection<POJORepresentingJobCompleteInfo> input) {        return input            .apply("Read_From_BigQuery", ParDo.of(new BigQueryRread()))            .apply("Applying_Windowing", Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                .discardingFiredPanes());    }    private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> {        @ProcessElement        public void process(@Element POJORepresentingJobCompleteInfo input, ProcessContext c) {            Map<Stats1Key, Stats1> resultMap = new HashMap<>();                       try {                BigQuery bigQueryClient = BigQueryOptions.getDefaultInstance().getService();                String sqlQuery = getSqlQuery(input); ///some method to return desired sql query based on info present in input                QueryJobConfiguration queryJobConfiguration =                    QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();                // Create a job ID so that we can safely retry.                JobId jobId = JobId.of(UUID.randomUUID().toString());                Job queryJob = bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());                // Wait for the query to complete.                queryJob = queryJob.waitFor();                if (queryJob == null) {                    logger.p1Error("Big Query Job no longer exists");                } else if (queryJob.getStatus().getError() != null) {                    // You can also look at queryJob.getStatus().getExecutionErrors() for all                    // errors, not just the latest one.                    logger.p1Error("Big Query job returned error: {}", queryJob.getStatus().getError().toString());                } else {                    //successful case                    logger.info("Parsing results executed by BigQuery");                    // Get the results.                    TableResult result = queryJob.getQueryResults();                    if (null == result || !result.iterateAll().iterator().hasNext()) {                        logger.info("No data found for query: {}", sqlQuery);                    } else {                        // Print all pages of the results.                        for (FieldValueList row : result.iterateAll()) {                                /*** Parse row and create Stats1Key and Stats from that row/                                resultMap.put(key, stats);                            }                        }                    }                }            } catch (Exception ex) {                logger.p1Error("Error in executing sql query against Big Query", ex);            }            logger.info("Emitting map of size: {}", resultMap.size());            c.output(resultMap);        }    }
>>>>>>
>>>>>>     As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow above code design
>>>>>>
>>>>>>
>>>>>> 5. Using KafkaIO we read continuous stream of data from kafka
>>>>>>
>>>>>>     PCollection<POJO> Logs =
>>>>>>
>>>>>>         p            .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()                .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))                .withTopic("topic")                .withKeyDeserializer(StringDeserializer.class)                .withValueDeserializer(ByteArrayDeserializer.class)                .withConsumerConfigUpdates(kafkaConsumerProperties)                .withConsumerFactoryFn(consumerFactoryObj)                .commitOffsetsInFinalize())            .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))                .withAllowedLateness(Duration.standardDays(1))                .discardingFiredPanes())            .apply("Convert_KafkaRecord_To_PCollection<POJO>",                ParDo.of(new ParseLogs());
>>>>>>
>>>>>>
>>>>>> 6. Take these logs and apply another Transform providing aforementioned BQ reads as side input i.e. something like this
>>>>>>
>>>>>>     Logs.apply("decorate", new Decorate().withSideInput(Stats1View, Stats2View...);
>>>>>>
>>>>>>
>>>>>> Please Note: I tried commenting out code where I added side input to the above transform and still landed up in the same crash. So Issue is definitely in adding
>>>>>>
>>>>>> more than a certain number of PCollectionView transforms. I already had 3-4 such transforms and it was working fine. Yesterday I added a few more and started seeing crashes.
>>>>>>
>>>>>> If I enable just one of the newly added PCollectionView transforms (keeping old 3-4 intact), then everything works fine. Moment I enable another new transform, a crash happens.
>>>>>>
>>>>>>
>>>>>> Hope this provides some more insight. Let me know if I need to open a ticket or I am doing something wrong or some extra configuration or different worker machine type need to be provided.
>>>>>>
>>>>>>
>>>>>> Thanks and Regards
>>>>>>
>>>>>> Mohil
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Luke,
>>>>>>>
>>>>>>> I can send you a code snippet with more details if it helps.
>>>>>>>
>>>>>>> BTW found similar issue here:
>>>>>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3CCAF9t7_74PKr7fJ51-6_Tbsycz9AiZ_xSM7RCALi5Kmkd1Ng5dQ@mail.gmail.com%3E
>>>>>>>
>>>>>>> Thanks and Regards
>>>>>>> Mohil
>>>>>>>
>>>>>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Luke,
>>>>>>>> Thanks for your response, I tried looking at worker logs using the
>>>>>>>> logging service of GCP and unable to get a clear picture. Not sure if its
>>>>>>>> due to memory pressure or low number of harness threads.
>>>>>>>> Attaching a few more screenshots of crash logs that I found as
>>>>>>>> wells json dump of logs.
>>>>>>>>
>>>>>>>> Let me know if you still think opening a ticket is a right way to
>>>>>>>> go.
>>>>>>>>
>>>>>>>> Thanks and regards
>>>>>>>> Mohil
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Try looking at the worker logs to get a full stack trace. Take a
>>>>>>>>> look at this page for some debugging guidance[1] or consider opening a
>>>>>>>>> support case with GCP.
>>>>>>>>>
>>>>>>>>> 1:
>>>>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>>>>>
>>>>>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> BTW, just to make sure that there is no issue with any individual
>>>>>>>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>>>>>>>> successfully. Issue happens as soon as I enable more than one new
>>>>>>>>>> aforementioned PTransform.
>>>>>>>>>>
>>>>>>>>>> Thanks and regards
>>>>>>>>>> Mohil
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello All,
>>>>>>>>>>>
>>>>>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>>>>>>>
>>>>>>>>>>> Need urgent help in debugging one issue.
>>>>>>>>>>>
>>>>>>>>>>> I recently added 3-4 new PTransformations. to an
>>>>>>>>>>> existing pipeline where I read data from BQ for a certain timestamp and
>>>>>>>>>>> create PCollectionView<Map<Key,value>> to be used as side input in other
>>>>>>>>>>> PTransforms.
>>>>>>>>>>>
>>>>>>>>>>> i.e. something like this:
>>>>>>>>>>>
>>>>>>>>>>> /**
>>>>>>>>>>>  * Get PCollectionView Stats1
>>>>>>>>>>>  */
>>>>>>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>>>>>>>     jobCompleteStatus
>>>>>>>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>>>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>>>>>>>
>>>>>>>>>>> /**
>>>>>>>>>>>  * Get PCollectionView of Stats2
>>>>>>>>>>>  */
>>>>>>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>>>>>>>     jobCompleteStatus
>>>>>>>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>>>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>>>>>>>>>
>>>>>>>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>>>>>>>
>>>>>>>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>>>>>>>
>>>>>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Then as an experiment I made a change where I enabled only one new transformation
>>>>>>>>>>>
>>>>>>>>>>> and disabled others. This time I didn't see any issue.
>>>>>>>>>>>
>>>>>>>>>>> So it looks like some memory issue.
>>>>>>>>>>>
>>>>>>>>>>> I also compared worker logs between working case and non working case
>>>>>>>>>>>
>>>>>>>>>>> and it looks resources were not granted in non working case.
>>>>>>>>>>>
>>>>>>>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>>>>>>>
>>>>>>>>>>> I could't find any other log.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I would really appreciate quick help here.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks and Regards
>>>>>>>>>>>
>>>>>>>>>>> Mohil
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Mohil Khare <mo...@prosimo.io>.
Sure not a problem. I will open one.

Thanks
Mohil

On Fri, Jun 26, 2020 at 10:55 AM Luke Cwik <lc...@google.com> wrote:

> Sorry, I didn't open a support case. You should open the support case.
>
> On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Thanks a lot Luke for following up on this and opening a dataflow
>> support.  It would be good to know how streamingEngine solved the problem.
>> I will really appreciate it if you can share a link for a support case
>> once you open it (if it is possible).
>>
>> Thanks and Regards
>> Mohil
>>
>>
>>
>> On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> It seems as though we have seen this failure before for Dataflow and it
>>> was caused because the side input tags needed to be unique in a streaming
>>> pipeline.
>>>
>>> It looked like this used to be a common occurrence in the Python SDK[1,
>>> 2] because it generated tags that weren't unique enough.
>>>
>>> I would open up a case with Dataflow support with all the information
>>> you have provided here.
>>>
>>> 1: https://issues.apache.org/jira/browse/BEAM-4549
>>> 2: https://issues.apache.org/jira/browse/BEAM-4534
>>>
>>> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hi Luke and all,
>>>>
>>>> UPDATE: So when I started my job by *enabling the streaming engine and
>>>> keeping the machine type default for the streaming engine (n1-standard-2)*,
>>>> the pipeline started successfully.
>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>
>>>> Still evaluating to make sure that there is no performance degradation
>>>> by doing so.
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>>
>>>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hi Luke,
>>>>>
>>>>> Let me give you some more details about the code.
>>>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>>>
>>>>> Default machine type which n1-standard-4.
>>>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks
>>>>> it up based on number of cores available)
>>>>>
>>>>> 1: Code listens for some trigger on pubsub topic:
>>>>>         /**
>>>>>
>>>>>      * Read From PubSub for topic ANALYTICS_UPDATE and create PCollection<String> indicating main pipeline to reload     * relevant DataAnalyticsData from BQ table     */    static class MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {        private final String subscriptionName;        private final String jobProject;        MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String jobProject) {            this.subscriptionName = subscriptionName;            this.jobProject = jobProject;        }        @Override        public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {            return input.getPipeline()                .apply("Read_PubSub_Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))                .apply("Applying_Windowing", Window.<PubsubMessage>into(new GlobalWindows())                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                    .discardingFiredPanes())                .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, POJORepresentingJobCompleteInfo>() {                    @ProcessElement                    public void processElement(@Element PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo> out) {                        /*** Read and CReate ***/
>>>>>
>>>>>                         out.output(POJORepresentingJobCompleteInfo);                                           }                }));        }    }
>>>>>
>>>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using google cloud bigquery library (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>>>>
>>>>>     PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>>>>>
>>>>>             new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, jobProject));
>>>>>
>>>>>
>>>>> 3. Create various PCollectionViews to be used as side input for decorating stream of logs coming from Kafka (will be shown later)
>>>>>
>>>>>    PCollectionView<Map<Stats1Key, Stats1>> Stats1View =
>>>>>
>>>>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats1())                 .apply("View_Stats1", View.asSingleton());
>>>>>
>>>>>
>>>>>    PCollectionView<Map<Stats2Key, Stats2>> Stats2View =
>>>>>
>>>>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats2())                 .apply("View_Stats1", View.asSingleton());
>>>>>
>>>>>    .
>>>>>
>>>>>    .
>>>>>
>>>>>    .
>>>>>
>>>>>    . and so one
>>>>>
>>>>>
>>>>> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), ReadStats2() and so on
>>>>>
>>>>>    class ReadStatsS1 extends PTransform<PCollection<POJORepresentingJobCompleteInfo>, PCollection<Map<Stats1Key, Stats1>>> {
>>>>>
>>>>>       @Override    public PCollection<Map<Stats1Key, Stats1>> expand(PCollection<POJORepresentingJobCompleteInfo> input) {        return input            .apply("Read_From_BigQuery", ParDo.of(new BigQueryRread()))            .apply("Applying_Windowing", Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                .discardingFiredPanes());    }    private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> {        @ProcessElement        public void process(@Element POJORepresentingJobCompleteInfo input, ProcessContext c) {            Map<Stats1Key, Stats1> resultMap = new HashMap<>();                       try {                BigQuery bigQueryClient = BigQueryOptions.getDefaultInstance().getService();                String sqlQuery = getSqlQuery(input); ///some method to return desired sql query based on info present in input                QueryJobConfiguration queryJobConfiguration =                    QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();                // Create a job ID so that we can safely retry.                JobId jobId = JobId.of(UUID.randomUUID().toString());                Job queryJob = bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());                // Wait for the query to complete.                queryJob = queryJob.waitFor();                if (queryJob == null) {                    logger.p1Error("Big Query Job no longer exists");                } else if (queryJob.getStatus().getError() != null) {                    // You can also look at queryJob.getStatus().getExecutionErrors() for all                    // errors, not just the latest one.                    logger.p1Error("Big Query job returned error: {}", queryJob.getStatus().getError().toString());                } else {                    //successful case                    logger.info("Parsing results executed by BigQuery");                    // Get the results.                    TableResult result = queryJob.getQueryResults();                    if (null == result || !result.iterateAll().iterator().hasNext()) {                        logger.info("No data found for query: {}", sqlQuery);                    } else {                        // Print all pages of the results.                        for (FieldValueList row : result.iterateAll()) {                                /*** Parse row and create Stats1Key and Stats from that row/                                resultMap.put(key, stats);                            }                        }                    }                }            } catch (Exception ex) {                logger.p1Error("Error in executing sql query against Big Query", ex);            }            logger.info("Emitting map of size: {}", resultMap.size());            c.output(resultMap);        }    }
>>>>>
>>>>>     As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow above code design
>>>>>
>>>>>
>>>>> 5. Using KafkaIO we read continuous stream of data from kafka
>>>>>
>>>>>     PCollection<POJO> Logs =
>>>>>
>>>>>         p            .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()                .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))                .withTopic("topic")                .withKeyDeserializer(StringDeserializer.class)                .withValueDeserializer(ByteArrayDeserializer.class)                .withConsumerConfigUpdates(kafkaConsumerProperties)                .withConsumerFactoryFn(consumerFactoryObj)                .commitOffsetsInFinalize())            .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))                .withAllowedLateness(Duration.standardDays(1))                .discardingFiredPanes())            .apply("Convert_KafkaRecord_To_PCollection<POJO>",                ParDo.of(new ParseLogs());
>>>>>
>>>>>
>>>>> 6. Take these logs and apply another Transform providing aforementioned BQ reads as side input i.e. something like this
>>>>>
>>>>>     Logs.apply("decorate", new Decorate().withSideInput(Stats1View, Stats2View...);
>>>>>
>>>>>
>>>>> Please Note: I tried commenting out code where I added side input to the above transform and still landed up in the same crash. So Issue is definitely in adding
>>>>>
>>>>> more than a certain number of PCollectionView transforms. I already had 3-4 such transforms and it was working fine. Yesterday I added a few more and started seeing crashes.
>>>>>
>>>>> If I enable just one of the newly added PCollectionView transforms (keeping old 3-4 intact), then everything works fine. Moment I enable another new transform, a crash happens.
>>>>>
>>>>>
>>>>> Hope this provides some more insight. Let me know if I need to open a ticket or I am doing something wrong or some extra configuration or different worker machine type need to be provided.
>>>>>
>>>>>
>>>>> Thanks and Regards
>>>>>
>>>>> Mohil
>>>>>
>>>>>
>>>>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>
>>>>>> Hi Luke,
>>>>>>
>>>>>> I can send you a code snippet with more details if it helps.
>>>>>>
>>>>>> BTW found similar issue here:
>>>>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3CCAF9t7_74PKr7fJ51-6_Tbsycz9AiZ_xSM7RCALi5Kmkd1Ng5dQ@mail.gmail.com%3E
>>>>>>
>>>>>> Thanks and Regards
>>>>>> Mohil
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Luke,
>>>>>>> Thanks for your response, I tried looking at worker logs using the
>>>>>>> logging service of GCP and unable to get a clear picture. Not sure if its
>>>>>>> due to memory pressure or low number of harness threads.
>>>>>>> Attaching a few more screenshots of crash logs that I found as wells
>>>>>>> json dump of logs.
>>>>>>>
>>>>>>> Let me know if you still think opening a ticket is a right way to go.
>>>>>>>
>>>>>>> Thanks and regards
>>>>>>> Mohil
>>>>>>>
>>>>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> Try looking at the worker logs to get a full stack trace. Take a
>>>>>>>> look at this page for some debugging guidance[1] or consider opening a
>>>>>>>> support case with GCP.
>>>>>>>>
>>>>>>>> 1:
>>>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> BTW, just to make sure that there is no issue with any individual
>>>>>>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>>>>>>> successfully. Issue happens as soon as I enable more than one new
>>>>>>>>> aforementioned PTransform.
>>>>>>>>>
>>>>>>>>> Thanks and regards
>>>>>>>>> Mohil
>>>>>>>>>
>>>>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello All,
>>>>>>>>>>
>>>>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>>>>>>
>>>>>>>>>> Need urgent help in debugging one issue.
>>>>>>>>>>
>>>>>>>>>> I recently added 3-4 new PTransformations. to an
>>>>>>>>>> existing pipeline where I read data from BQ for a certain timestamp and
>>>>>>>>>> create PCollectionView<Map<Key,value>> to be used as side input in other
>>>>>>>>>> PTransforms.
>>>>>>>>>>
>>>>>>>>>> i.e. something like this:
>>>>>>>>>>
>>>>>>>>>> /**
>>>>>>>>>>  * Get PCollectionView Stats1
>>>>>>>>>>  */
>>>>>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>>>>>>     jobCompleteStatus
>>>>>>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>>>>>>
>>>>>>>>>> /**
>>>>>>>>>>  * Get PCollectionView of Stats2
>>>>>>>>>>  */
>>>>>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>>>>>>     jobCompleteStatus
>>>>>>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>>>>>>>>
>>>>>>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>>>>>>
>>>>>>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>>>>>>
>>>>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Then as an experiment I made a change where I enabled only one new transformation
>>>>>>>>>>
>>>>>>>>>> and disabled others. This time I didn't see any issue.
>>>>>>>>>>
>>>>>>>>>> So it looks like some memory issue.
>>>>>>>>>>
>>>>>>>>>> I also compared worker logs between working case and non working case
>>>>>>>>>>
>>>>>>>>>> and it looks resources were not granted in non working case.
>>>>>>>>>>
>>>>>>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>>>>>>
>>>>>>>>>> I could't find any other log.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I would really appreciate quick help here.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks and Regards
>>>>>>>>>>
>>>>>>>>>> Mohil
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Luke Cwik <lc...@google.com>.
Sorry, I didn't open a support case. You should open the support case.

On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare <mo...@prosimo.io> wrote:

> Thanks a lot Luke for following up on this and opening a dataflow
> support.  It would be good to know how streamingEngine solved the problem.
> I will really appreciate it if you can share a link for a support case
> once you open it (if it is possible).
>
> Thanks and Regards
> Mohil
>
>
>
> On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik <lc...@google.com> wrote:
>
>> It seems as though we have seen this failure before for Dataflow and it
>> was caused because the side input tags needed to be unique in a streaming
>> pipeline.
>>
>> It looked like this used to be a common occurrence in the Python SDK[1,
>> 2] because it generated tags that weren't unique enough.
>>
>> I would open up a case with Dataflow support with all the information you
>> have provided here.
>>
>> 1: https://issues.apache.org/jira/browse/BEAM-4549
>> 2: https://issues.apache.org/jira/browse/BEAM-4534
>>
>> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hi Luke and all,
>>>
>>> UPDATE: So when I started my job by *enabling the streaming engine and
>>> keeping the machine type default for the streaming engine (n1-standard-2)*,
>>> the pipeline started successfully.
>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>
>>> Still evaluating to make sure that there is no performance degradation
>>> by doing so.
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>>
>>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> Let me give you some more details about the code.
>>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>>
>>>> Default machine type which n1-standard-4.
>>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks
>>>> it up based on number of cores available)
>>>>
>>>> 1: Code listens for some trigger on pubsub topic:
>>>>         /**
>>>>
>>>>      * Read From PubSub for topic ANALYTICS_UPDATE and create PCollection<String> indicating main pipeline to reload     * relevant DataAnalyticsData from BQ table     */    static class MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {        private final String subscriptionName;        private final String jobProject;        MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String jobProject) {            this.subscriptionName = subscriptionName;            this.jobProject = jobProject;        }        @Override        public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {            return input.getPipeline()                .apply("Read_PubSub_Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))                .apply("Applying_Windowing", Window.<PubsubMessage>into(new GlobalWindows())                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                    .discardingFiredPanes())                .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, POJORepresentingJobCompleteInfo>() {                    @ProcessElement                    public void processElement(@Element PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo> out) {                        /*** Read and CReate ***/
>>>>
>>>>                         out.output(POJORepresentingJobCompleteInfo);                                           }                }));        }    }
>>>>
>>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using google cloud bigquery library (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>>>
>>>>     PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>>>>
>>>>             new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, jobProject));
>>>>
>>>>
>>>> 3. Create various PCollectionViews to be used as side input for decorating stream of logs coming from Kafka (will be shown later)
>>>>
>>>>    PCollectionView<Map<Stats1Key, Stats1>> Stats1View =
>>>>
>>>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats1())                 .apply("View_Stats1", View.asSingleton());
>>>>
>>>>
>>>>    PCollectionView<Map<Stats2Key, Stats2>> Stats2View =
>>>>
>>>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats2())                 .apply("View_Stats1", View.asSingleton());
>>>>
>>>>    .
>>>>
>>>>    .
>>>>
>>>>    .
>>>>
>>>>    . and so one
>>>>
>>>>
>>>> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), ReadStats2() and so on
>>>>
>>>>    class ReadStatsS1 extends PTransform<PCollection<POJORepresentingJobCompleteInfo>, PCollection<Map<Stats1Key, Stats1>>> {
>>>>
>>>>       @Override    public PCollection<Map<Stats1Key, Stats1>> expand(PCollection<POJORepresentingJobCompleteInfo> input) {        return input            .apply("Read_From_BigQuery", ParDo.of(new BigQueryRread()))            .apply("Applying_Windowing", Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                .discardingFiredPanes());    }    private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> {        @ProcessElement        public void process(@Element POJORepresentingJobCompleteInfo input, ProcessContext c) {            Map<Stats1Key, Stats1> resultMap = new HashMap<>();                       try {                BigQuery bigQueryClient = BigQueryOptions.getDefaultInstance().getService();                String sqlQuery = getSqlQuery(input); ///some method to return desired sql query based on info present in input                QueryJobConfiguration queryJobConfiguration =                    QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();                // Create a job ID so that we can safely retry.                JobId jobId = JobId.of(UUID.randomUUID().toString());                Job queryJob = bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());                // Wait for the query to complete.                queryJob = queryJob.waitFor();                if (queryJob == null) {                    logger.p1Error("Big Query Job no longer exists");                } else if (queryJob.getStatus().getError() != null) {                    // You can also look at queryJob.getStatus().getExecutionErrors() for all                    // errors, not just the latest one.                    logger.p1Error("Big Query job returned error: {}", queryJob.getStatus().getError().toString());                } else {                    //successful case                    logger.info("Parsing results executed by BigQuery");                    // Get the results.                    TableResult result = queryJob.getQueryResults();                    if (null == result || !result.iterateAll().iterator().hasNext()) {                        logger.info("No data found for query: {}", sqlQuery);                    } else {                        // Print all pages of the results.                        for (FieldValueList row : result.iterateAll()) {                                /*** Parse row and create Stats1Key and Stats from that row/                                resultMap.put(key, stats);                            }                        }                    }                }            } catch (Exception ex) {                logger.p1Error("Error in executing sql query against Big Query", ex);            }            logger.info("Emitting map of size: {}", resultMap.size());            c.output(resultMap);        }    }
>>>>
>>>>     As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow above code design
>>>>
>>>>
>>>> 5. Using KafkaIO we read continuous stream of data from kafka
>>>>
>>>>     PCollection<POJO> Logs =
>>>>
>>>>         p            .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()                .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))                .withTopic("topic")                .withKeyDeserializer(StringDeserializer.class)                .withValueDeserializer(ByteArrayDeserializer.class)                .withConsumerConfigUpdates(kafkaConsumerProperties)                .withConsumerFactoryFn(consumerFactoryObj)                .commitOffsetsInFinalize())            .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))                .withAllowedLateness(Duration.standardDays(1))                .discardingFiredPanes())            .apply("Convert_KafkaRecord_To_PCollection<POJO>",                ParDo.of(new ParseLogs());
>>>>
>>>>
>>>> 6. Take these logs and apply another Transform providing aforementioned BQ reads as side input i.e. something like this
>>>>
>>>>     Logs.apply("decorate", new Decorate().withSideInput(Stats1View, Stats2View...);
>>>>
>>>>
>>>> Please Note: I tried commenting out code where I added side input to the above transform and still landed up in the same crash. So Issue is definitely in adding
>>>>
>>>> more than a certain number of PCollectionView transforms. I already had 3-4 such transforms and it was working fine. Yesterday I added a few more and started seeing crashes.
>>>>
>>>> If I enable just one of the newly added PCollectionView transforms (keeping old 3-4 intact), then everything works fine. Moment I enable another new transform, a crash happens.
>>>>
>>>>
>>>> Hope this provides some more insight. Let me know if I need to open a ticket or I am doing something wrong or some extra configuration or different worker machine type need to be provided.
>>>>
>>>>
>>>> Thanks and Regards
>>>>
>>>> Mohil
>>>>
>>>>
>>>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hi Luke,
>>>>>
>>>>> I can send you a code snippet with more details if it helps.
>>>>>
>>>>> BTW found similar issue here:
>>>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3CCAF9t7_74PKr7fJ51-6_Tbsycz9AiZ_xSM7RCALi5Kmkd1Ng5dQ@mail.gmail.com%3E
>>>>>
>>>>> Thanks and Regards
>>>>> Mohil
>>>>>
>>>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>
>>>>>> Hi Luke,
>>>>>> Thanks for your response, I tried looking at worker logs using the
>>>>>> logging service of GCP and unable to get a clear picture. Not sure if its
>>>>>> due to memory pressure or low number of harness threads.
>>>>>> Attaching a few more screenshots of crash logs that I found as wells
>>>>>> json dump of logs.
>>>>>>
>>>>>> Let me know if you still think opening a ticket is a right way to go.
>>>>>>
>>>>>> Thanks and regards
>>>>>> Mohil
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Try looking at the worker logs to get a full stack trace. Take a
>>>>>>> look at this page for some debugging guidance[1] or consider opening a
>>>>>>> support case with GCP.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>>>
>>>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> BTW, just to make sure that there is no issue with any individual
>>>>>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>>>>>> successfully. Issue happens as soon as I enable more than one new
>>>>>>>> aforementioned PTransform.
>>>>>>>>
>>>>>>>> Thanks and regards
>>>>>>>> Mohil
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello All,
>>>>>>>>>
>>>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>>>>>
>>>>>>>>> Need urgent help in debugging one issue.
>>>>>>>>>
>>>>>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>>>>>> where I read data from BQ for a certain timestamp and create
>>>>>>>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>>>>>>>> PTransforms.
>>>>>>>>>
>>>>>>>>> i.e. something like this:
>>>>>>>>>
>>>>>>>>> /**
>>>>>>>>>  * Get PCollectionView Stats1
>>>>>>>>>  */
>>>>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>>>>>     jobCompleteStatus
>>>>>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>>>>>
>>>>>>>>> /**
>>>>>>>>>  * Get PCollectionView of Stats2
>>>>>>>>>  */
>>>>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>>>>>     jobCompleteStatus
>>>>>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>>>>>>>
>>>>>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>>>>>
>>>>>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>>>>>
>>>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Then as an experiment I made a change where I enabled only one new transformation
>>>>>>>>>
>>>>>>>>> and disabled others. This time I didn't see any issue.
>>>>>>>>>
>>>>>>>>> So it looks like some memory issue.
>>>>>>>>>
>>>>>>>>> I also compared worker logs between working case and non working case
>>>>>>>>>
>>>>>>>>> and it looks resources were not granted in non working case.
>>>>>>>>>
>>>>>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>>>>>
>>>>>>>>> I could't find any other log.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I would really appreciate quick help here.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks and Regards
>>>>>>>>>
>>>>>>>>> Mohil
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Mohil Khare <mo...@prosimo.io>.
Thanks a lot Luke for following up on this and opening a dataflow support.
It would be good to know how streamingEngine solved the problem.
I will really appreciate it if you can share a link for a support case once
you open it (if it is possible).

Thanks and Regards
Mohil



On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik <lc...@google.com> wrote:

> It seems as though we have seen this failure before for Dataflow and it
> was caused because the side input tags needed to be unique in a streaming
> pipeline.
>
> It looked like this used to be a common occurrence in the Python SDK[1, 2]
> because it generated tags that weren't unique enough.
>
> I would open up a case with Dataflow support with all the information you
> have provided here.
>
> 1: https://issues.apache.org/jira/browse/BEAM-4549
> 2: https://issues.apache.org/jira/browse/BEAM-4534
>
> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hi Luke and all,
>>
>> UPDATE: So when I started my job by *enabling the streaming engine and
>> keeping the machine type default for the streaming engine (n1-standard-2)*,
>> the pipeline started successfully.
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>
>> Still evaluating to make sure that there is no performance degradation by
>> doing so.
>>
>> Thanks and regards
>> Mohil
>>
>>
>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hi Luke,
>>>
>>> Let me give you some more details about the code.
>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>
>>> Default machine type which n1-standard-4.
>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it
>>> up based on number of cores available)
>>>
>>> 1: Code listens for some trigger on pubsub topic:
>>>         /**
>>>
>>>      * Read From PubSub for topic ANALYTICS_UPDATE and create PCollection<String> indicating main pipeline to reload     * relevant DataAnalyticsData from BQ table     */    static class MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {        private final String subscriptionName;        private final String jobProject;        MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String jobProject) {            this.subscriptionName = subscriptionName;            this.jobProject = jobProject;        }        @Override        public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {            return input.getPipeline()                .apply("Read_PubSub_Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))                .apply("Applying_Windowing", Window.<PubsubMessage>into(new GlobalWindows())                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                    .discardingFiredPanes())                .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, POJORepresentingJobCompleteInfo>() {                    @ProcessElement                    public void processElement(@Element PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo> out) {                        /*** Read and CReate ***/
>>>
>>>                         out.output(POJORepresentingJobCompleteInfo);                                           }                }));        }    }
>>>
>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using google cloud bigquery library (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>>
>>>     PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>>>
>>>             new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, jobProject));
>>>
>>>
>>> 3. Create various PCollectionViews to be used as side input for decorating stream of logs coming from Kafka (will be shown later)
>>>
>>>    PCollectionView<Map<Stats1Key, Stats1>> Stats1View =
>>>
>>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats1())                 .apply("View_Stats1", View.asSingleton());
>>>
>>>
>>>    PCollectionView<Map<Stats2Key, Stats2>> Stats2View =
>>>
>>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats2())                 .apply("View_Stats1", View.asSingleton());
>>>
>>>    .
>>>
>>>    .
>>>
>>>    .
>>>
>>>    . and so one
>>>
>>>
>>> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), ReadStats2() and so on
>>>
>>>    class ReadStatsS1 extends PTransform<PCollection<POJORepresentingJobCompleteInfo>, PCollection<Map<Stats1Key, Stats1>>> {
>>>
>>>       @Override    public PCollection<Map<Stats1Key, Stats1>> expand(PCollection<POJORepresentingJobCompleteInfo> input) {        return input            .apply("Read_From_BigQuery", ParDo.of(new BigQueryRread()))            .apply("Applying_Windowing", Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                .discardingFiredPanes());    }    private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> {        @ProcessElement        public void process(@Element POJORepresentingJobCompleteInfo input, ProcessContext c) {            Map<Stats1Key, Stats1> resultMap = new HashMap<>();                       try {                BigQuery bigQueryClient = BigQueryOptions.getDefaultInstance().getService();                String sqlQuery = getSqlQuery(input); ///some method to return desired sql query based on info present in input                QueryJobConfiguration queryJobConfiguration =                    QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();                // Create a job ID so that we can safely retry.                JobId jobId = JobId.of(UUID.randomUUID().toString());                Job queryJob = bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());                // Wait for the query to complete.                queryJob = queryJob.waitFor();                if (queryJob == null) {                    logger.p1Error("Big Query Job no longer exists");                } else if (queryJob.getStatus().getError() != null) {                    // You can also look at queryJob.getStatus().getExecutionErrors() for all                    // errors, not just the latest one.                    logger.p1Error("Big Query job returned error: {}", queryJob.getStatus().getError().toString());                } else {                    //successful case                    logger.info("Parsing results executed by BigQuery");                    // Get the results.                    TableResult result = queryJob.getQueryResults();                    if (null == result || !result.iterateAll().iterator().hasNext()) {                        logger.info("No data found for query: {}", sqlQuery);                    } else {                        // Print all pages of the results.                        for (FieldValueList row : result.iterateAll()) {                                /*** Parse row and create Stats1Key and Stats from that row/                                resultMap.put(key, stats);                            }                        }                    }                }            } catch (Exception ex) {                logger.p1Error("Error in executing sql query against Big Query", ex);            }            logger.info("Emitting map of size: {}", resultMap.size());            c.output(resultMap);        }    }
>>>
>>>     As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow above code design
>>>
>>>
>>> 5. Using KafkaIO we read continuous stream of data from kafka
>>>
>>>     PCollection<POJO> Logs =
>>>
>>>         p            .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()                .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))                .withTopic("topic")                .withKeyDeserializer(StringDeserializer.class)                .withValueDeserializer(ByteArrayDeserializer.class)                .withConsumerConfigUpdates(kafkaConsumerProperties)                .withConsumerFactoryFn(consumerFactoryObj)                .commitOffsetsInFinalize())            .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))                .withAllowedLateness(Duration.standardDays(1))                .discardingFiredPanes())            .apply("Convert_KafkaRecord_To_PCollection<POJO>",                ParDo.of(new ParseLogs());
>>>
>>>
>>> 6. Take these logs and apply another Transform providing aforementioned BQ reads as side input i.e. something like this
>>>
>>>     Logs.apply("decorate", new Decorate().withSideInput(Stats1View, Stats2View...);
>>>
>>>
>>> Please Note: I tried commenting out code where I added side input to the above transform and still landed up in the same crash. So Issue is definitely in adding
>>>
>>> more than a certain number of PCollectionView transforms. I already had 3-4 such transforms and it was working fine. Yesterday I added a few more and started seeing crashes.
>>>
>>> If I enable just one of the newly added PCollectionView transforms (keeping old 3-4 intact), then everything works fine. Moment I enable another new transform, a crash happens.
>>>
>>>
>>> Hope this provides some more insight. Let me know if I need to open a ticket or I am doing something wrong or some extra configuration or different worker machine type need to be provided.
>>>
>>>
>>> Thanks and Regards
>>>
>>> Mohil
>>>
>>>
>>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> I can send you a code snippet with more details if it helps.
>>>>
>>>> BTW found similar issue here:
>>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3CCAF9t7_74PKr7fJ51-6_Tbsycz9AiZ_xSM7RCALi5Kmkd1Ng5dQ@mail.gmail.com%3E
>>>>
>>>> Thanks and Regards
>>>> Mohil
>>>>
>>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hi Luke,
>>>>> Thanks for your response, I tried looking at worker logs using the
>>>>> logging service of GCP and unable to get a clear picture. Not sure if its
>>>>> due to memory pressure or low number of harness threads.
>>>>> Attaching a few more screenshots of crash logs that I found as wells
>>>>> json dump of logs.
>>>>>
>>>>> Let me know if you still think opening a ticket is a right way to go.
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Try looking at the worker logs to get a full stack trace. Take a look
>>>>>> at this page for some debugging guidance[1] or consider opening a support
>>>>>> case with GCP.
>>>>>>
>>>>>> 1:
>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>>
>>>>>>> BTW, just to make sure that there is no issue with any individual
>>>>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>>>>> successfully. Issue happens as soon as I enable more than one new
>>>>>>> aforementioned PTransform.
>>>>>>>
>>>>>>> Thanks and regards
>>>>>>> Mohil
>>>>>>>
>>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello All,
>>>>>>>>
>>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>>>>
>>>>>>>> Need urgent help in debugging one issue.
>>>>>>>>
>>>>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>>>>> where I read data from BQ for a certain timestamp and create
>>>>>>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>>>>>>> PTransforms.
>>>>>>>>
>>>>>>>> i.e. something like this:
>>>>>>>>
>>>>>>>> /**
>>>>>>>>  * Get PCollectionView Stats1
>>>>>>>>  */
>>>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>>>>     jobCompleteStatus
>>>>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>>>>
>>>>>>>> /**
>>>>>>>>  * Get PCollectionView of Stats2
>>>>>>>>  */
>>>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>>>>     jobCompleteStatus
>>>>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>>>>
>>>>>>>>
>>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>>>>>>
>>>>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>>>>
>>>>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>>>>
>>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>>>>>>
>>>>>>>>
>>>>>>>> Then as an experiment I made a change where I enabled only one new transformation
>>>>>>>>
>>>>>>>> and disabled others. This time I didn't see any issue.
>>>>>>>>
>>>>>>>> So it looks like some memory issue.
>>>>>>>>
>>>>>>>> I also compared worker logs between working case and non working case
>>>>>>>>
>>>>>>>> and it looks resources were not granted in non working case.
>>>>>>>>
>>>>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>>>>
>>>>>>>> I could't find any other log.
>>>>>>>>
>>>>>>>>
>>>>>>>> I would really appreciate quick help here.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks and Regards
>>>>>>>>
>>>>>>>> Mohil
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Luke Cwik <lc...@google.com>.
It seems as though we have seen this failure before for Dataflow and it was
caused because the side input tags needed to be unique in a streaming
pipeline.

It looked like this used to be a common occurrence in the Python SDK[1, 2]
because it generated tags that weren't unique enough.

I would open up a case with Dataflow support with all the information you
have provided here.

1: https://issues.apache.org/jira/browse/BEAM-4549
2: https://issues.apache.org/jira/browse/BEAM-4534

On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare <mo...@prosimo.io> wrote:

> Hi Luke and all,
>
> UPDATE: So when I started my job by *enabling the streaming engine and
> keeping the machine type default for the streaming engine (n1-standard-2)*,
> the pipeline started successfully.
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>
> Still evaluating to make sure that there is no performance degradation by
> doing so.
>
> Thanks and regards
> Mohil
>
>
> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hi Luke,
>>
>> Let me give you some more details about the code.
>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>
>> Default machine type which n1-standard-4.
>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it
>> up based on number of cores available)
>>
>> 1: Code listens for some trigger on pubsub topic:
>>         /**
>>
>>      * Read From PubSub for topic ANALYTICS_UPDATE and create PCollection<String> indicating main pipeline to reload     * relevant DataAnalyticsData from BQ table     */    static class MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {        private final String subscriptionName;        private final String jobProject;        MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String jobProject) {            this.subscriptionName = subscriptionName;            this.jobProject = jobProject;        }        @Override        public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {            return input.getPipeline()                .apply("Read_PubSub_Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))                .apply("Applying_Windowing", Window.<PubsubMessage>into(new GlobalWindows())                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                    .discardingFiredPanes())                .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, POJORepresentingJobCompleteInfo>() {                    @ProcessElement                    public void processElement(@Element PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo> out) {                        /*** Read and CReate ***/
>>
>>                         out.output(POJORepresentingJobCompleteInfo);                                           }                }));        }    }
>>
>> 2: Get Latest Updated and Reload new Updates from various BQ tables using google cloud bigquery library (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>
>>     PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>>
>>             new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, jobProject));
>>
>>
>> 3. Create various PCollectionViews to be used as side input for decorating stream of logs coming from Kafka (will be shown later)
>>
>>    PCollectionView<Map<Stats1Key, Stats1>> Stats1View =
>>
>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats1())                 .apply("View_Stats1", View.asSingleton());
>>
>>
>>    PCollectionView<Map<Stats2Key, Stats2>> Stats2View =
>>
>>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats2())                 .apply("View_Stats1", View.asSingleton());
>>
>>    .
>>
>>    .
>>
>>    .
>>
>>    . and so one
>>
>>
>> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), ReadStats2() and so on
>>
>>    class ReadStatsS1 extends PTransform<PCollection<POJORepresentingJobCompleteInfo>, PCollection<Map<Stats1Key, Stats1>>> {
>>
>>       @Override    public PCollection<Map<Stats1Key, Stats1>> expand(PCollection<POJORepresentingJobCompleteInfo> input) {        return input            .apply("Read_From_BigQuery", ParDo.of(new BigQueryRread()))            .apply("Applying_Windowing", Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                .discardingFiredPanes());    }    private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> {        @ProcessElement        public void process(@Element POJORepresentingJobCompleteInfo input, ProcessContext c) {            Map<Stats1Key, Stats1> resultMap = new HashMap<>();                       try {                BigQuery bigQueryClient = BigQueryOptions.getDefaultInstance().getService();                String sqlQuery = getSqlQuery(input); ///some method to return desired sql query based on info present in input                QueryJobConfiguration queryJobConfiguration =                    QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();                // Create a job ID so that we can safely retry.                JobId jobId = JobId.of(UUID.randomUUID().toString());                Job queryJob = bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());                // Wait for the query to complete.                queryJob = queryJob.waitFor();                if (queryJob == null) {                    logger.p1Error("Big Query Job no longer exists");                } else if (queryJob.getStatus().getError() != null) {                    // You can also look at queryJob.getStatus().getExecutionErrors() for all                    // errors, not just the latest one.                    logger.p1Error("Big Query job returned error: {}", queryJob.getStatus().getError().toString());                } else {                    //successful case                    logger.info("Parsing results executed by BigQuery");                    // Get the results.                    TableResult result = queryJob.getQueryResults();                    if (null == result || !result.iterateAll().iterator().hasNext()) {                        logger.info("No data found for query: {}", sqlQuery);                    } else {                        // Print all pages of the results.                        for (FieldValueList row : result.iterateAll()) {                                /*** Parse row and create Stats1Key and Stats from that row/                                resultMap.put(key, stats);                            }                        }                    }                }            } catch (Exception ex) {                logger.p1Error("Error in executing sql query against Big Query", ex);            }            logger.info("Emitting map of size: {}", resultMap.size());            c.output(resultMap);        }    }
>>
>>     As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow above code design
>>
>>
>> 5. Using KafkaIO we read continuous stream of data from kafka
>>
>>     PCollection<POJO> Logs =
>>
>>         p            .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()                .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))                .withTopic("topic")                .withKeyDeserializer(StringDeserializer.class)                .withValueDeserializer(ByteArrayDeserializer.class)                .withConsumerConfigUpdates(kafkaConsumerProperties)                .withConsumerFactoryFn(consumerFactoryObj)                .commitOffsetsInFinalize())            .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))                .withAllowedLateness(Duration.standardDays(1))                .discardingFiredPanes())            .apply("Convert_KafkaRecord_To_PCollection<POJO>",                ParDo.of(new ParseLogs());
>>
>>
>> 6. Take these logs and apply another Transform providing aforementioned BQ reads as side input i.e. something like this
>>
>>     Logs.apply("decorate", new Decorate().withSideInput(Stats1View, Stats2View...);
>>
>>
>> Please Note: I tried commenting out code where I added side input to the above transform and still landed up in the same crash. So Issue is definitely in adding
>>
>> more than a certain number of PCollectionView transforms. I already had 3-4 such transforms and it was working fine. Yesterday I added a few more and started seeing crashes.
>>
>> If I enable just one of the newly added PCollectionView transforms (keeping old 3-4 intact), then everything works fine. Moment I enable another new transform, a crash happens.
>>
>>
>> Hope this provides some more insight. Let me know if I need to open a ticket or I am doing something wrong or some extra configuration or different worker machine type need to be provided.
>>
>>
>> Thanks and Regards
>>
>> Mohil
>>
>>
>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hi Luke,
>>>
>>> I can send you a code snippet with more details if it helps.
>>>
>>> BTW found similar issue here:
>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3CCAF9t7_74PKr7fJ51-6_Tbsycz9AiZ_xSM7RCALi5Kmkd1Ng5dQ@mail.gmail.com%3E
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hi Luke,
>>>> Thanks for your response, I tried looking at worker logs using the
>>>> logging service of GCP and unable to get a clear picture. Not sure if its
>>>> due to memory pressure or low number of harness threads.
>>>> Attaching a few more screenshots of crash logs that I found as wells
>>>> json dump of logs.
>>>>
>>>> Let me know if you still think opening a ticket is a right way to go.
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Try looking at the worker logs to get a full stack trace. Take a look
>>>>> at this page for some debugging guidance[1] or consider opening a support
>>>>> case with GCP.
>>>>>
>>>>> 1:
>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>
>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>
>>>>>> BTW, just to make sure that there is no issue with any individual
>>>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>>>> successfully. Issue happens as soon as I enable more than one new
>>>>>> aforementioned PTransform.
>>>>>>
>>>>>> Thanks and regards
>>>>>> Mohil
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>>
>>>>>>> Hello All,
>>>>>>>
>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>>>
>>>>>>> Need urgent help in debugging one issue.
>>>>>>>
>>>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>>>> where I read data from BQ for a certain timestamp and create
>>>>>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>>>>>> PTransforms.
>>>>>>>
>>>>>>> i.e. something like this:
>>>>>>>
>>>>>>> /**
>>>>>>>  * Get PCollectionView Stats1
>>>>>>>  */
>>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>>>     jobCompleteStatus
>>>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>>>
>>>>>>> /**
>>>>>>>  * Get PCollectionView of Stats2
>>>>>>>  */
>>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>>>     jobCompleteStatus
>>>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>>>
>>>>>>>
>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>>>>>
>>>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>>>
>>>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>>>
>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>>>>>
>>>>>>>
>>>>>>> Then as an experiment I made a change where I enabled only one new transformation
>>>>>>>
>>>>>>> and disabled others. This time I didn't see any issue.
>>>>>>>
>>>>>>> So it looks like some memory issue.
>>>>>>>
>>>>>>> I also compared worker logs between working case and non working case
>>>>>>>
>>>>>>> and it looks resources were not granted in non working case.
>>>>>>>
>>>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>>>
>>>>>>> I could't find any other log.
>>>>>>>
>>>>>>>
>>>>>>> I would really appreciate quick help here.
>>>>>>>
>>>>>>>
>>>>>>> Thanks and Regards
>>>>>>>
>>>>>>> Mohil
>>>>>>>
>>>>>>>
>>>>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Mohil Khare <mo...@prosimo.io>.
Hi Luke and all,

UPDATE: So when I started my job by *enabling the streaming engine and
keeping the machine type default for the streaming engine (n1-standard-2)*,
the pipeline started successfully.
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine

Still evaluating to make sure that there is no performance degradation by
doing so.

Thanks and regards
Mohil


On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote:

> Hi Luke,
>
> Let me give you some more details about the code.
> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>
> Default machine type which n1-standard-4.
> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it
> up based on number of cores available)
>
> 1: Code listens for some trigger on pubsub topic:
>         /**
>
>      * Read From PubSub for topic ANALYTICS_UPDATE and create PCollection<String> indicating main pipeline to reload     * relevant DataAnalyticsData from BQ table     */    static class MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> {        private final String subscriptionName;        private final String jobProject;        MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String jobProject) {            this.subscriptionName = subscriptionName;            this.jobProject = jobProject;        }        @Override        public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {            return input.getPipeline()                .apply("Read_PubSub_Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))                .apply("Applying_Windowing", Window.<PubsubMessage>into(new GlobalWindows())                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                    .discardingFiredPanes())                .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, POJORepresentingJobCompleteInfo>() {                    @ProcessElement                    public void processElement(@Element PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo> out) {                        /*** Read and CReate ***/
>
>                         out.output(POJORepresentingJobCompleteInfo);                                           }                }));        }    }
>
> 2: Get Latest Updated and Reload new Updates from various BQ tables using google cloud bigquery library (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>
>     PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>
>             new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, jobProject));
>
>
> 3. Create various PCollectionViews to be used as side input for decorating stream of logs coming from Kafka (will be shown later)
>
>    PCollectionView<Map<Stats1Key, Stats1>> Stats1View =
>
>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats1())                 .apply("View_Stats1", View.asSingleton());
>
>
>    PCollectionView<Map<Stats2Key, Stats2>> Stats2View =
>
>             analyticsDataStatusUpdates                 .apply("Reload_Stats1_FromBQ", new ReadStats2())                 .apply("View_Stats1", View.asSingleton());
>
>    .
>
>    .
>
>    .
>
>    . and so one
>
>
> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), ReadStats2() and so on
>
>    class ReadStatsS1 extends PTransform<PCollection<POJORepresentingJobCompleteInfo>, PCollection<Map<Stats1Key, Stats1>>> {
>
>       @Override    public PCollection<Map<Stats1Key, Stats1>> expand(PCollection<POJORepresentingJobCompleteInfo> input) {        return input            .apply("Read_From_BigQuery", ParDo.of(new BigQueryRread()))            .apply("Applying_Windowing", Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))                .discardingFiredPanes());    }    private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> {        @ProcessElement        public void process(@Element POJORepresentingJobCompleteInfo input, ProcessContext c) {            Map<Stats1Key, Stats1> resultMap = new HashMap<>();                       try {                BigQuery bigQueryClient = BigQueryOptions.getDefaultInstance().getService();                String sqlQuery = getSqlQuery(input); ///some method to return desired sql query based on info present in input                QueryJobConfiguration queryJobConfiguration =                    QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();                // Create a job ID so that we can safely retry.                JobId jobId = JobId.of(UUID.randomUUID().toString());                Job queryJob = bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());                // Wait for the query to complete.                queryJob = queryJob.waitFor();                if (queryJob == null) {                    logger.p1Error("Big Query Job no longer exists");                } else if (queryJob.getStatus().getError() != null) {                    // You can also look at queryJob.getStatus().getExecutionErrors() for all                    // errors, not just the latest one.                    logger.p1Error("Big Query job returned error: {}", queryJob.getStatus().getError().toString());                } else {                    //successful case                    logger.info("Parsing results executed by BigQuery");                    // Get the results.                    TableResult result = queryJob.getQueryResults();                    if (null == result || !result.iterateAll().iterator().hasNext()) {                        logger.info("No data found for query: {}", sqlQuery);                    } else {                        // Print all pages of the results.                        for (FieldValueList row : result.iterateAll()) {                                /*** Parse row and create Stats1Key and Stats from that row/                                resultMap.put(key, stats);                            }                        }                    }                }            } catch (Exception ex) {                logger.p1Error("Error in executing sql query against Big Query", ex);            }            logger.info("Emitting map of size: {}", resultMap.size());            c.output(resultMap);        }    }
>
>     As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow above code design
>
>
> 5. Using KafkaIO we read continuous stream of data from kafka
>
>     PCollection<POJO> Logs =
>
>         p            .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()                .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))                .withTopic("topic")                .withKeyDeserializer(StringDeserializer.class)                .withValueDeserializer(ByteArrayDeserializer.class)                .withConsumerConfigUpdates(kafkaConsumerProperties)                .withConsumerFactoryFn(consumerFactoryObj)                .commitOffsetsInFinalize())            .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))                .withAllowedLateness(Duration.standardDays(1))                .discardingFiredPanes())            .apply("Convert_KafkaRecord_To_PCollection<POJO>",                ParDo.of(new ParseLogs());
>
>
> 6. Take these logs and apply another Transform providing aforementioned BQ reads as side input i.e. something like this
>
>     Logs.apply("decorate", new Decorate().withSideInput(Stats1View, Stats2View...);
>
>
> Please Note: I tried commenting out code where I added side input to the above transform and still landed up in the same crash. So Issue is definitely in adding
>
> more than a certain number of PCollectionView transforms. I already had 3-4 such transforms and it was working fine. Yesterday I added a few more and started seeing crashes.
>
> If I enable just one of the newly added PCollectionView transforms (keeping old 3-4 intact), then everything works fine. Moment I enable another new transform, a crash happens.
>
>
> Hope this provides some more insight. Let me know if I need to open a ticket or I am doing something wrong or some extra configuration or different worker machine type need to be provided.
>
>
> Thanks and Regards
>
> Mohil
>
>
> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hi Luke,
>>
>> I can send you a code snippet with more details if it helps.
>>
>> BTW found similar issue here:
>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3CCAF9t7_74PKr7fJ51-6_Tbsycz9AiZ_xSM7RCALi5Kmkd1Ng5dQ@mail.gmail.com%3E
>>
>> Thanks and Regards
>> Mohil
>>
>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hi Luke,
>>> Thanks for your response, I tried looking at worker logs using the
>>> logging service of GCP and unable to get a clear picture. Not sure if its
>>> due to memory pressure or low number of harness threads.
>>> Attaching a few more screenshots of crash logs that I found as wells
>>> json dump of logs.
>>>
>>> Let me know if you still think opening a ticket is a right way to go.
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Try looking at the worker logs to get a full stack trace. Take a look
>>>> at this page for some debugging guidance[1] or consider opening a support
>>>> case with GCP.
>>>>
>>>> 1:
>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>
>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> BTW, just to make sure that there is no issue with any individual
>>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>>> successfully. Issue happens as soon as I enable more than one new
>>>>> aforementioned PTransform.
>>>>>
>>>>> Thanks and regards
>>>>> Mohil
>>>>>
>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>>
>>>>>> Need urgent help in debugging one issue.
>>>>>>
>>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>>> where I read data from BQ for a certain timestamp and create
>>>>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>>>>> PTransforms.
>>>>>>
>>>>>> i.e. something like this:
>>>>>>
>>>>>> /**
>>>>>>  * Get PCollectionView Stats1
>>>>>>  */
>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>>     jobCompleteStatus
>>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>>
>>>>>> /**
>>>>>>  * Get PCollectionView of Stats2
>>>>>>  */
>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>>     jobCompleteStatus
>>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>>
>>>>>>
>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>>>>
>>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>>
>>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>>
>>>>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>>>>
>>>>>>
>>>>>> Then as an experiment I made a change where I enabled only one new transformation
>>>>>>
>>>>>> and disabled others. This time I didn't see any issue.
>>>>>>
>>>>>> So it looks like some memory issue.
>>>>>>
>>>>>> I also compared worker logs between working case and non working case
>>>>>>
>>>>>> and it looks resources were not granted in non working case.
>>>>>>
>>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>>
>>>>>> I could't find any other log.
>>>>>>
>>>>>>
>>>>>> I would really appreciate quick help here.
>>>>>>
>>>>>>
>>>>>> Thanks and Regards
>>>>>>
>>>>>> Mohil
>>>>>>
>>>>>>
>>>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Mohil Khare <mo...@prosimo.io>.
Hi Luke,

Let me give you some more details about the code.
As I mentioned before, I am using java sdk 2.19.0 on dataflow.

Default machine type which n1-standard-4.
Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it up
based on number of cores available)

1: Code listens for some trigger on pubsub topic:
        /**

     * Read From PubSub for topic ANALYTICS_UPDATE and create
PCollection<String> indicating main pipeline to reload     * relevant
DataAnalyticsData from BQ table     */    static class
MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin,
PCollection<POJORepresentingJobCompleteInfo>> {        private final
String subscriptionName;        private final String jobProject;
 MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName,
String jobProject) {            this.subscriptionName =
subscriptionName;            this.jobProject = jobProject;        }
    @Override        public
PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {
        return input.getPipeline()
.apply("Read_PubSub_Messages",
PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
               .apply("Applying_Windowing",
Window.<PubsubMessage>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
              .discardingFiredPanes())
.apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage,
POJORepresentingJobCompleteInfo>() {
@ProcessElement                    public void processElement(@Element
PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo>
out) {                        /*** Read and CReate ***/

                        out.output(POJORepresentingJobCompleteInfo);
                                        }                }));        }
   }

2: Get Latest Updated and Reload new Updates from various BQ tables
using google cloud bigquery library
(https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)

    PCollection<POJORepresentingJobCompleteInfo>
analyticsDataStatusUpdates =
p.apply("Get_Analytics_Data_Status_Updates_pubsub",

            new
MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName,
jobProject));


3. Create various PCollectionViews to be used as side input for
decorating stream of logs coming from Kafka (will be shown later)

   PCollectionView<Map<Stats1Key, Stats1>> Stats1View =

            analyticsDataStatusUpdates
.apply("Reload_Stats1_FromBQ", new ReadStats1())
.apply("View_Stats1", View.asSingleton());


   PCollectionView<Map<Stats2Key, Stats2>> Stats2View =

            analyticsDataStatusUpdates
.apply("Reload_Stats1_FromBQ", new ReadStats2())
.apply("View_Stats1", View.asSingleton());

   .

   .

   .

   . and so one


4. An example of code where we read stats from BQ i.e. in
ReadStats1(), ReadStats2() and so on

   class ReadStatsS1 extends
PTransform<PCollection<POJORepresentingJobCompleteInfo>,
PCollection<Map<Stats1Key, Stats1>>> {

      @Override    public PCollection<Map<Stats1Key, Stats1>>
expand(PCollection<POJORepresentingJobCompleteInfo> input) {
return input            .apply("Read_From_BigQuery", ParDo.of(new
BigQueryRread()))            .apply("Applying_Windowing",
Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())
 .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
           .discardingFiredPanes());    }    private class
BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo,
Map<Stats1Key, Stats1>> {        @ProcessElement        public void
process(@Element POJORepresentingJobCompleteInfo input, ProcessContext
c) {            Map<Stats1Key, Stats1> resultMap = new HashMap<>();
                   try {                BigQuery bigQueryClient =
BigQueryOptions.getDefaultInstance().getService();
String sqlQuery = getSqlQuery(input); ///some method to return desired
sql query based on info present in input
QueryJobConfiguration queryJobConfiguration =
QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();
               // Create a job ID so that we can safely retry.
       JobId jobId = JobId.of(UUID.randomUUID().toString());
     Job queryJob =
bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());
               // Wait for the query to complete.
queryJob = queryJob.waitFor();                if (queryJob == null) {
                  logger.p1Error("Big Query Job no longer exists");
            } else if (queryJob.getStatus().getError() != null) {
              // You can also look at
queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
logger.p1Error("Big Query job returned error: {}",
queryJob.getStatus().getError().toString());                } else {
                 //successful case
logger.info("Parsing results executed by BigQuery");
 // Get the results.                    TableResult result =
queryJob.getQueryResults();                    if (null == result ||
!result.iterateAll().iterator().hasNext()) {
logger.info("No data found for query: {}", sqlQuery);
  } else {                        // Print all pages of the results.
                     for (FieldValueList row : result.iterateAll()) {
                              /*** Parse row and create Stats1Key and
Stats from that row/                                resultMap.put(key,
stats);                            }                        }
          }                }            } catch (Exception ex) {
         logger.p1Error("Error in executing sql query against Big
Query", ex);            }            logger.info("Emitting map of
size: {}", resultMap.size());            c.output(resultMap);        }
   }

    As I mentioned before all classes : ReadStats1(), ReadStats2() etc
follow above code design


5. Using KafkaIO we read continuous stream of data from kafka

    PCollection<POJO> Logs =

        p            .apply("Read__Logs_From_Kafka", KafkaIO.<String,
byte[]>read()                .withBootstrapServers(String.join(",",
bootstrapServerToConnectTo))                .withTopic("topic")
        .withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
    .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
               .withAllowedLateness(Duration.standardDays(1))
      .discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection<POJO>",
ParDo.of(new ParseLogs());


6. Take these logs and apply another Transform providing
aforementioned BQ reads as side input i.e. something like this

    Logs.apply("decorate", new Decorate().withSideInput(Stats1View,
Stats2View...);


Please Note: I tried commenting out code where I added side input to
the above transform and still landed up in the same crash. So Issue is
definitely in adding

more than a certain number of PCollectionView transforms. I already
had 3-4 such transforms and it was working fine. Yesterday I added a
few more and started seeing crashes.

If I enable just one of the newly added PCollectionView transforms
(keeping old 3-4 intact), then everything works fine. Moment I enable
another new transform, a crash happens.


Hope this provides some more insight. Let me know if I need to open a
ticket or I am doing something wrong or some extra configuration or
different worker machine type need to be provided.


Thanks and Regards

Mohil


On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote:

> Hi Luke,
>
> I can send you a code snippet with more details if it helps.
>
> BTW found similar issue here:
> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3CCAF9t7_74PKr7fJ51-6_Tbsycz9AiZ_xSM7RCALi5Kmkd1Ng5dQ@mail.gmail.com%3E
>
> Thanks and Regards
> Mohil
>
> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hi Luke,
>> Thanks for your response, I tried looking at worker logs using the
>> logging service of GCP and unable to get a clear picture. Not sure if its
>> due to memory pressure or low number of harness threads.
>> Attaching a few more screenshots of crash logs that I found as wells json
>> dump of logs.
>>
>> Let me know if you still think opening a ticket is a right way to go.
>>
>> Thanks and regards
>> Mohil
>>
>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Try looking at the worker logs to get a full stack trace. Take a look at
>>> this page for some debugging guidance[1] or consider opening a support case
>>> with GCP.
>>>
>>> 1:
>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>
>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> BTW, just to make sure that there is no issue with any individual
>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>> successfully. Issue happens as soon as I enable more than one new
>>>> aforementioned PTransform.
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>
>>>>> Need urgent help in debugging one issue.
>>>>>
>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>> where I read data from BQ for a certain timestamp and create
>>>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>>>> PTransforms.
>>>>>
>>>>> i.e. something like this:
>>>>>
>>>>> /**
>>>>>  * Get PCollectionView Stats1
>>>>>  */
>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>     jobCompleteStatus
>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>
>>>>> /**
>>>>>  * Get PCollectionView of Stats2
>>>>>  */
>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>     jobCompleteStatus
>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>
>>>>>
>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>>>
>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>
>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>
>>>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>>>
>>>>>
>>>>> Then as an experiment I made a change where I enabled only one new transformation
>>>>>
>>>>> and disabled others. This time I didn't see any issue.
>>>>>
>>>>> So it looks like some memory issue.
>>>>>
>>>>> I also compared worker logs between working case and non working case
>>>>>
>>>>> and it looks resources were not granted in non working case.
>>>>>
>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>
>>>>> I could't find any other log.
>>>>>
>>>>>
>>>>> I would really appreciate quick help here.
>>>>>
>>>>>
>>>>> Thanks and Regards
>>>>>
>>>>> Mohil
>>>>>
>>>>>
>>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Mohil Khare <mo...@prosimo.io>.
Hi Luke,

I can send you a code snippet with more details if it helps.

BTW found similar issue here:
http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3CCAF9t7_74PKr7fJ51-6_Tbsycz9AiZ_xSM7RCALi5Kmkd1Ng5dQ@mail.gmail.com%3E

Thanks and Regards
Mohil

On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote:

> Hi Luke,
> Thanks for your response, I tried looking at worker logs using the logging
> service of GCP and unable to get a clear picture. Not sure if its due to
> memory pressure or low number of harness threads.
> Attaching a few more screenshots of crash logs that I found as wells json
> dump of logs.
>
> Let me know if you still think opening a ticket is a right way to go.
>
> Thanks and regards
> Mohil
>
> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>
>> Try looking at the worker logs to get a full stack trace. Take a look at
>> this page for some debugging guidance[1] or consider opening a support case
>> with GCP.
>>
>> 1:
>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>
>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> BTW, just to make sure that there is no issue with any individual
>>> PTransform, I enabled each one of them one by one and the pipeline started
>>> successfully. Issue happens as soon as I enable more than one new
>>> aforementioned PTransform.
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hello All,
>>>>
>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>
>>>> Need urgent help in debugging one issue.
>>>>
>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>> where I read data from BQ for a certain timestamp and create
>>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>>> PTransforms.
>>>>
>>>> i.e. something like this:
>>>>
>>>> /**
>>>>  * Get PCollectionView Stats1
>>>>  */
>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>     jobCompleteStatus
>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>
>>>> /**
>>>>  * Get PCollectionView of Stats2
>>>>  */
>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>     jobCompleteStatus
>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>
>>>>
>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>>
>>>> received from PubSub that act as a trigger to reload these views.
>>>>
>>>> The moment I deployed the above pipeline, it didn't start and
>>>>
>>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>>
>>>>
>>>> Then as an experiment I made a change where I enabled only one new transformation
>>>>
>>>> and disabled others. This time I didn't see any issue.
>>>>
>>>> So it looks like some memory issue.
>>>>
>>>> I also compared worker logs between working case and non working case
>>>>
>>>> and it looks resources were not granted in non working case.
>>>>
>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>
>>>> I could't find any other log.
>>>>
>>>>
>>>> I would really appreciate quick help here.
>>>>
>>>>
>>>> Thanks and Regards
>>>>
>>>> Mohil
>>>>
>>>>
>>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Mohil Khare <mo...@prosimo.io>.
Hi Luke,
Thanks for your response, I tried looking at worker logs using the logging
service of GCP and unable to get a clear picture. Not sure if its due to
memory pressure or low number of harness threads.
Attaching a few more screenshots of crash logs that I found as wells json
dump of logs.

Let me know if you still think opening a ticket is a right way to go.

Thanks and regards
Mohil

On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:

> Try looking at the worker logs to get a full stack trace. Take a look at
> this page for some debugging guidance[1] or consider opening a support case
> with GCP.
>
> 1:
> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>
> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> BTW, just to make sure that there is no issue with any individual
>> PTransform, I enabled each one of them one by one and the pipeline started
>> successfully. Issue happens as soon as I enable more than one new
>> aforementioned PTransform.
>>
>> Thanks and regards
>> Mohil
>>
>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hello All,
>>>
>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>
>>> Need urgent help in debugging one issue.
>>>
>>> I recently added 3-4 new PTransformations. to an existing pipeline where
>>> I read data from BQ for a certain timestamp and create
>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>> PTransforms.
>>>
>>> i.e. something like this:
>>>
>>> /**
>>>  * Get PCollectionView Stats1
>>>  */
>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>     jobCompleteStatus
>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>         .apply("View_S1STATS", View.asSingleton());
>>>
>>> /**
>>>  * Get PCollectionView of Stats2
>>>  */
>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>     jobCompleteStatus
>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>         .apply("View_S2STATS", View.asSingleton());
>>>
>>>
>>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>>
>>> received from PubSub that act as a trigger to reload these views.
>>>
>>> The moment I deployed the above pipeline, it didn't start and
>>>
>>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>>
>>>
>>> Then as an experiment I made a change where I enabled only one new transformation
>>>
>>> and disabled others. This time I didn't see any issue.
>>>
>>> So it looks like some memory issue.
>>>
>>> I also compared worker logs between working case and non working case
>>>
>>> and it looks resources were not granted in non working case.
>>>
>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>
>>> I could't find any other log.
>>>
>>>
>>> I would really appreciate quick help here.
>>>
>>>
>>> Thanks and Regards
>>>
>>> Mohil
>>>
>>>
>>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Luke Cwik <lc...@google.com>.
Try looking at the worker logs to get a full stack trace. Take a look at
this page for some debugging guidance[1] or consider opening a support case
with GCP.

1:
https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline

On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote:

> BTW, just to make sure that there is no issue with any individual
> PTransform, I enabled each one of them one by one and the pipeline started
> successfully. Issue happens as soon as I enable more than one new
> aforementioned PTransform.
>
> Thanks and regards
> Mohil
>
> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hello All,
>>
>> I am using the BEAM java 2.19.0 version on google dataflow.
>>
>> Need urgent help in debugging one issue.
>>
>> I recently added 3-4 new PTransformations. to an existing pipeline where
>> I read data from BQ for a certain timestamp and create
>> PCollectionView<Map<Key,value>> to be used as side input in other
>> PTransforms.
>>
>> i.e. something like this:
>>
>> /**
>>  * Get PCollectionView Stats1
>>  */
>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>     jobCompleteStatus
>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>         .apply("View_S1STATS", View.asSingleton());
>>
>> /**
>>  * Get PCollectionView of Stats2
>>  */
>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>     jobCompleteStatus
>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>         .apply("View_S2STATS", View.asSingleton());
>>
>>
>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>
>> received from PubSub that act as a trigger to reload these views.
>>
>> The moment I deployed the above pipeline, it didn't start and
>>
>> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>>
>>
>> Then as an experiment I made a change where I enabled only one new transformation
>>
>> and disabled others. This time I didn't see any issue.
>>
>> So it looks like some memory issue.
>>
>> I also compared worker logs between working case and non working case
>>
>> and it looks resources were not granted in non working case.
>>
>> (See attached working-workerlogs and nonworking-workerlogs)
>>
>> I could't find any other log.
>>
>>
>> I would really appreciate quick help here.
>>
>>
>> Thanks and Regards
>>
>> Mohil
>>
>>
>>

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

Posted by Mohil Khare <mo...@prosimo.io>.
BTW, just to make sure that there is no issue with any individual
PTransform, I enabled each one of them one by one and the pipeline started
successfully. Issue happens as soon as I enable more than one new
aforementioned PTransform.

Thanks and regards
Mohil

On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote:

> Hello All,
>
> I am using the BEAM java 2.19.0 version on google dataflow.
>
> Need urgent help in debugging one issue.
>
> I recently added 3-4 new PTransformations. to an existing pipeline where I
> read data from BQ for a certain timestamp and create
> PCollectionView<Map<Key,value>> to be used as side input in other
> PTransforms.
>
> i.e. something like this:
>
> /**
>  * Get PCollectionView Stats1
>  */
> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>     jobCompleteStatus
>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>         .apply("View_S1STATS", View.asSingleton());
>
> /**
>  * Get PCollectionView of Stats2
>  */
> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>     jobCompleteStatus
>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>         .apply("View_S2STATS", View.asSingleton());
>
>
> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>
> received from PubSub that act as a trigger to reload these views.
>
> The moment I deployed the above pipeline, it didn't start and
>
> error reporting gave weird exceptions(see attached screenshot1 and screenshot) which I don't know how to debug.
>
>
> Then as an experiment I made a change where I enabled only one new transformation
>
> and disabled others. This time I didn't see any issue.
>
> So it looks like some memory issue.
>
> I also compared worker logs between working case and non working case
>
> and it looks resources were not granted in non working case.
>
> (See attached working-workerlogs and nonworking-workerlogs)
>
> I could't find any other log.
>
>
> I would really appreciate quick help here.
>
>
> Thanks and Regards
>
> Mohil
>
>
>