You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hop.apache.org by Fabian Peters <po...@mercadu.de> on 2022/09/02 10:55:51 UTC

Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Hi once more,

I feel a little bit like I've started my slow descent into madness. What I supposed to be a configuration error now looks like it's not. I've put together a minimal test case project <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour I'm seeing in my project. I'd be very grateful if someone could take a look and tell me whether I'm taking a wrong turn somewhere or whether there really is a bug.

What I'm trying to achieve: There are records of one entity (agent) that come with a site_id field. Some site_id values do not correspond to a valid site. The agent records are to be joined with valid records of the second entity (site), so that only agent records with an existing site_id get to proceed.

The setup works fine with the local runner (even though the inputs are not sorted). With the Beam-Direct runner, the fields are getting mixed up, in this case the values from site_name end up in the site_id column.

cheers

Fabian

2022/09/02 12:35:33 - Hop - Pipeline opened.
2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
2022/09/02 12:35:33 - Hop - Started the pipeline execution.
2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name 'join-spike'
2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join
2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Beam-Direct'
2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
2022/09/02 12:35:33 - join-spike - ERROR: org.apache.hop.core.exception.HopException: 
2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner Direct
2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error executing TransformFn
2022/09/02 12:35:33 - join-spike - 
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
2022/09/02 12:35:33 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
2022/09/02 12:35:33 - join-spike - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error executing TransformFn
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
2022/09/02 12:35:33 - join-spike - 	... 2 more
2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException: Error executing TransformFn
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
2022/09/02 12:35:33 - join-spike - Caused by: org.apache.hop.core.exception.HopException: 
2022/09/02 12:35:33 - join-spike - Error performing an iteration in a single threaded pipeline
2022/09/02 12:35:33 - join-spike - 
2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [San Jose] does not correspond to value meta [Integer]
2022/09/02 12:35:33 - join-spike - 
2022/09/02 12:35:33 - join-spike - 
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown Source)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022/09/02 12:35:33 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
2022/09/02 12:35:33 - join-spike - Caused by: org.apache.hop.core.exception.HopValueException: 
2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [San Jose] does not correspond to value meta [Integer]
2022/09/02 12:35:33 - join-spike - 
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
2022/09/02 12:35:33 - join-spike - 	... 15 more



> Am 02.09.2022 um 09:48 schrieb Fabian Peters <po...@mercadu.de>:
> 
> Hi Hans,
> 
> That stack trace was due to user error and I'm glad it was. ;)
> There was one "id" field to many coming into the merge join…
> 
> Sorry for the noise!
> 
> cheers
> 
> Fabian
> 
>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <post@mercadu.de <ma...@mercadu.de>>:
>> 
>> Hi Hans,
>> 
>> Thanks! I probably read that at some point, but the "Notice" modal popping up when closing the "Merge join" dialogue probably convinced me otherwise: "If the incoming data is not sorted on the specified keys, the output results may not be correct. We recommend sorting the incoming data within the pipeline."
>> 
>> I'm testing it in my pipeline now and am getting a stack trace (see below). The "site_id" field is from an "Avro decode" transform and is a plain Integer. Using the local runner and writing to Postgres this works fine.
>> 
>> cheers
>> 
>> Fabian
>> 
>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
>> 2022/09/02 09:20:20 - General - ERROR: org.apache.hop.core.exception.HopException: 
>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct
>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General - 
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 09:20:20 - General - 	at java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 09:20:20 - General - 	... 2 more
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General - Caused by: org.apache.hop.core.exception.HopValueException: 
>> 2022/09/02 09:20:20 - General - Unexpected conversion error while converting value [site_id Integer] to an Integer
>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to class java.lang.Long (java.lang.String and java.lang.Long are in module java.base of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General - 
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 09:20:20 - General - 	at java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Long (java.lang.String and java.lang.Long are in module java.base of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
>> 2022/09/02 09:20:20 - General - 	... 17 more
>> 
>>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <hans.van.akelyen@gmail.com <ma...@gmail.com>>:
>>> 
>>> Hi Fabian,
>>> 
>>> Merge join does not require your data to be sorted when executing on Beam 
>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms>
>>> 
>>> Chases,
>>> Hans
>>> 
>>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
>>> Good morning Matt,
>>> 
>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so the Merge Join transform is not an option. I guess I'll have to use temporary BigQuery tables to handle this. Those pipelines are all bounded, so this is an option. Or is there an easy option to sort things when running on Beam?
>>> 
>>> I'll create a Jira ticket, no problem.
>>> 
>>> cheers
>>> 
>>> Fabian
>>> 
>>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <matt.casters@neo4j.com <ma...@neo4j.com>>:
>>>> 
>>>> Hi Fabian,
>>>> 
>>>> Joining rows is indeed the exception in Beam.  I would suggest you use the Merge Join <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> transforms.
>>>> For unbounded pipelines (never ending) that transform will be handled <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java> correctly. 
>>>> If you don't mind, please create a JIRA case so we can create a similar handler for the Cartesian product use-case.
>>>> The code usually is non-trivial in the massive parallel world but quite doable ;-)
>>>> 
>>>> All the best,
>>>> Matt
>>>> 
>>>> 
>>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
>>>> Hi all,
>>>> 
>>>> I've hit the next problem, this time something I thought I had testet on Beam before: A pipeline containing a "Join rows (cartesian product)" transform with input from two sources, loops forever when run via Beam-Direct or Dataflow. It works fine using the local runner.
>>>> 
>>>> While running it on Beam-Direct I've attached a debugger and can see that it is stuck in the while loop at JoinRows.java:486 <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>. I've tried using a GCS temp directory and swapped the "Main transform to read from" but none of those helped.
>>>> 
>>>> Is this transform incompatible with Beam? If so, what could I use instead?
>>>> 
>>>> cheers
>>>> 
>>>> Fabian
>>>> 
>>>> <PastedGraphic-8.png>
>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Neo4j Chief Solutions Architect
>>>> ✉   matt.casters@neo4j.com <ma...@neo4j.com>
>>>> 
>>>> 
>>>> 
>>> 
>> 
> 


Re: Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Posted by Fabian Peters <po...@mercadu.de>.
Hi Matt,

Sure, will do. Get well soon!

Fabian

> Am 02.09.2022 um 15:21 schrieb Matt Casters <ma...@neo4j.com>:
> 
> Could you create a JIRA case for this?  Bug reports are really more at home there.
> I do know that the Beam Join operator behaves a bit different so there's probably an edge case triggered somewhere.
> In principle this is a 5 minute thing to look at and fix but I'm struggling with a bit of covid and I just can't look at it right now.
> 
> All the best,
> Matt
> 
> On Fri, Sep 2, 2022 at 3:14 PM Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
> Hi Matt,
> 
> Thanks for your quick reply!
> 
> The "Write to log" transform actually works fine for me with the local, Beam-Direct and Dataflow runners.
> 
> I removed it from my test case and replaced the last one with a Beam Output transform. The issue stays the same, see stack trace below…
> 
> cheers
> 
> Fabian
> 
> 
> 2022/09/02 15:05:06 - Hop - Pipeline opened.
> 2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 15:05:06 - Hop - Started the pipeline execution.
> 2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name 'join-spike'
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites
> 2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) : Select values, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid agents, gets data from Select values
> 2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 15:05:14 - join-spike - ERROR: org.apache.hop.core.exception.HopException: 
> 2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner Direct
> 2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - 
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 15:05:14 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 15:05:14 - join-spike - 	... 2 more
> 2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException: Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148)
> 2022/09/02 15:05:14 - join-spike - Caused by: org.apache.hop.core.exception.HopException: 
> 2022/09/02 15:05:14 - join-spike - Error getting String from field site_id Integer on index 1 in input: [id Integer], [site_id Integer], [site_name String], [telephone String], [agent_name String], native value found: Mbabane
> 2022/09/02 15:05:14 - join-spike - 
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [Mbabane] does not correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike - 
> 2022/09/02 15:05:14 - join-spike - 
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown Source)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 15:05:14 - join-spike - 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 15:05:14 - join-spike - 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 15:05:14 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 15:05:14 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 15:05:14 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by: org.apache.hop.core.exception.HopValueException: 
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [Mbabane] does not correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike - 
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111)
> 2022/09/02 15:05:14 - join-spike - 	... 13 more
> 
> 
>> Am 02.09.2022 um 13:19 schrieb Matt Casters <matt.casters@neo4j.com <ma...@neo4j.com>>:
>> 
>> Hi Fabian,
>> 
>> If you remove the "Write to log" transforms the pipeline will work.  We typically don't use that transform as the log ends up on a Spark/Flink node somewhere where you can't see the information anyway.  It's the main reason why I'm working on HOP-4024.
>> 
>> Also, just as a reference, there is an example in samples project under beam/pipelines called complex.hpl which contains a merge join.
>> 
>> Best of luck!
>> Matt
>> 
>> 
>> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
>> Hi once more,
>> 
>> I feel a little bit like I've started my slow descent into madness. What I supposed to be a configuration error now looks like it's not. I've put together a minimal test case project <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour I'm seeing in my project. I'd be very grateful if someone could take a look and tell me whether I'm taking a wrong turn somewhere or whether there really is a bug.
>> 
>> What I'm trying to achieve: There are records of one entity (agent) that come with a site_id field. Some site_id values do not correspond to a valid site. The agent records are to be joined with valid records of the second entity (site), so that only agent records with an existing site_id get to proceed.
>> 
>> The setup works fine with the local runner (even though the inputs are not sorted). With the Beam-Direct runner, the fields are getting mixed up, in this case the values from site_name end up in the site_id column.
>> 
>> cheers
>> 
>> Fabian
>> 
>> 2022/09/02 12:35:33 - Hop - Pipeline opened.
>> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
>> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
>> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name 'join-spike'
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Beam-Direct'
>> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
>> 2022/09/02 12:35:33 - join-spike - ERROR: org.apache.hop.core.exception.HopException: 
>> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner Direct
>> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike - 
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 12:35:33 - join-spike - 	... 2 more
>> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException: Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
>> 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.hop.core.exception.HopException: 
>> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a single threaded pipeline
>> 2022/09/02 12:35:33 - join-spike - 
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [San Jose] does not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike - 
>> 2022/09/02 12:35:33 - join-spike - 
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown Source)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.hop.core.exception.HopValueException: 
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [San Jose] does not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike - 
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
>> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
>> 2022/09/02 12:35:33 - join-spike - 	... 15 more
>> 
>> 
>> 
>>> Am 02.09.2022 um 09:48 schrieb Fabian Peters <post@mercadu.de <ma...@mercadu.de>>:
>>> 
>>> Hi Hans,
>>> 
>>> That stack trace was due to user error and I'm glad it was. ;)
>>> There was one "id" field to many coming into the merge join…
>>> 
>>> Sorry for the noise!
>>> 
>>> cheers
>>> 
>>> Fabian
>>> 
>>>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <post@mercadu.de <ma...@mercadu.de>>:
>>>> 
>>>> Hi Hans,
>>>> 
>>>> Thanks! I probably read that at some point, but the "Notice" modal popping up when closing the "Merge join" dialogue probably convinced me otherwise: "If the incoming data is not sorted on the specified keys, the output results may not be correct. We recommend sorting the incoming data within the pipeline."
>>>> 
>>>> I'm testing it in my pipeline now and am getting a stack trace (see below). The "site_id" field is from an "Avro decode" transform and is a plain Integer. Using the local runner and writing to Postgres this works fine.
>>>> 
>>>> cheers
>>>> 
>>>> Fabian
>>>> 
>>>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
>>>> 2022/09/02 09:20:20 - General - ERROR: org.apache.hop.core.exception.HopException: 
>>>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct
>>>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>>>> 2022/09/02 09:20:20 - General - 
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>>>> 2022/09/02 09:20:20 - General - 	at java.base/java.lang.Thread.run(Thread.java:829)
>>>> 2022/09/02 09:20:20 - General - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>>>> 2022/09/02 09:20:20 - General - 	... 2 more
>>>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>>>> 2022/09/02 09:20:20 - General - Caused by: org.apache.hop.core.exception.HopValueException: 
>>>> 2022/09/02 09:20:20 - General - Unexpected conversion error while converting value [site_id Integer] to an Integer
>>>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to class java.lang.Long (java.lang.String and java.lang.Long are in module java.base of loader 'bootstrap')
>>>> 2022/09/02 09:20:20 - General - 
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>>>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> 2022/09/02 09:20:20 - General - 	at java.base/java.lang.Thread.run(Thread.java:829)
>>>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Long (java.lang.String and java.lang.Long are in module java.base of loader 'bootstrap')
>>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
>>>> 2022/09/02 09:20:20 - General - 	... 17 more
>>>> 
>>>>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <hans.van.akelyen@gmail.com <ma...@gmail.com>>:
>>>>> 
>>>>> Hi Fabian,
>>>>> 
>>>>> Merge join does not require your data to be sorted when executing on Beam 
>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms>
>>>>> 
>>>>> Chases,
>>>>> Hans
>>>>> 
>>>>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
>>>>> Good morning Matt,
>>>>> 
>>>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so the Merge Join transform is not an option. I guess I'll have to use temporary BigQuery tables to handle this. Those pipelines are all bounded, so this is an option. Or is there an easy option to sort things when running on Beam?
>>>>> 
>>>>> I'll create a Jira ticket, no problem.
>>>>> 
>>>>> cheers
>>>>> 
>>>>> Fabian
>>>>> 
>>>>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <matt.casters@neo4j.com <ma...@neo4j.com>>:
>>>>>> 
>>>>>> Hi Fabian,
>>>>>> 
>>>>>> Joining rows is indeed the exception in Beam.  I would suggest you use the Merge Join <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> transforms.
>>>>>> For unbounded pipelines (never ending) that transform will be handled <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java> correctly. 
>>>>>> If you don't mind, please create a JIRA case so we can create a similar handler for the Cartesian product use-case.
>>>>>> The code usually is non-trivial in the massive parallel world but quite doable ;-)
>>>>>> 
>>>>>> All the best,
>>>>>> Matt
>>>>>> 
>>>>>> 
>>>>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
>>>>>> Hi all,
>>>>>> 
>>>>>> I've hit the next problem, this time something I thought I had testet on Beam before: A pipeline containing a "Join rows (cartesian product)" transform with input from two sources, loops forever when run via Beam-Direct or Dataflow. It works fine using the local runner.
>>>>>> 
>>>>>> While running it on Beam-Direct I've attached a debugger and can see that it is stuck in the while loop at JoinRows.java:486 <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>. I've tried using a GCS temp directory and swapped the "Main transform to read from" but none of those helped.
>>>>>> 
>>>>>> Is this transform incompatible with Beam? If so, what could I use instead?
>>>>>> 
>>>>>> cheers
>>>>>> 
>>>>>> Fabian
>>>>>> 
>>>>>> <PastedGraphic-8.png>
>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> Neo4j Chief Solutions Architect
>>>>>> ✉   matt.casters@neo4j.com <ma...@neo4j.com>
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
>> 
>> -- 
>> Neo4j Chief Solutions Architect
>> ✉   matt.casters@neo4j.com <ma...@neo4j.com>
>> 
>> 
>> 
> 
> 
> 
> -- 
> Neo4j Chief Solutions Architect
> ✉   matt.casters@neo4j.com <ma...@neo4j.com>
> 
> 
> 


Re: Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Posted by Matt Casters <ma...@neo4j.com>.
Could you create a JIRA case for this?  Bug reports are really more at home
there.
I do know that the Beam Join operator behaves a bit different so there's
probably an edge case triggered somewhere.
In principle this is a 5 minute thing to look at and fix but I'm struggling
with a bit of covid and I just can't look at it right now.

All the best,
Matt

On Fri, Sep 2, 2022 at 3:14 PM Fabian Peters <po...@mercadu.de> wrote:

> Hi Matt,
>
> Thanks for your quick reply!
>
> The "Write to log" transform actually works fine for me with the local,
> Beam-Direct and Dataflow runners.
>
> I removed it from my test case and replaced the last one with a Beam
> Output transform. The issue stays the same, see stack trace below…
>
> cheers
>
> Fabian
>
>
> 2022/09/02 15:05:06 - Hop - Pipeline opened.
> 2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 15:05:06 - Hop - Started the pipeline execution.
> 2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name
> 'join-spike'
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites
> 2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) :
> Select values, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid
> agents, gets data from Select values
> 2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam
> Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 15:05:14 - join-spike - ERROR:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner
> Direct
> 2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error
> converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 15:05:14 - join-spike - ... 2 more
> 2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException:
> Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 15:05:14 - join-spike - Error getting String from field site_id
> Integer on index 1 in input: [id Integer], [site_id Integer], [site_name
> String], [telephone String], [agent_name String], native value found:
> Mbabane
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [Mbabane] does not
> correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.hop.core.exception.HopValueException:
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [Mbabane] does not
> correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111)
> 2022/09/02 15:05:14 - join-spike - ... 13 more
>
>
> Am 02.09.2022 um 13:19 schrieb Matt Casters <ma...@neo4j.com>:
>
> Hi Fabian,
>
> If you remove the "Write to log" transforms the pipeline will work.  We
> typically don't use that transform as the log ends up on a Spark/Flink node
> somewhere where you can't see the information anyway.  It's the main reason
> why I'm working on HOP-4024.
>
> Also, just as a reference, there is an example in samples project under
> beam/pipelines called complex.hpl which contains a merge join.
>
> Best of luck!
> Matt
>
>
> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <po...@mercadu.de> wrote:
>
>> Hi once more,
>>
>> I feel a little bit like I've started my slow descent into madness. What
>> I supposed to be a configuration error now looks like it's not. I've put
>> together a minimal test case project
>> <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour
>> I'm seeing in my project. I'd be very grateful if someone could take a look
>> and tell me whether I'm taking a wrong turn somewhere or whether there
>> really is a bug.
>>
>> What I'm trying to achieve: There are records of one entity (agent) that
>> come with a site_id field. Some site_id values do not correspond to a valid
>> site. The agent records are to be joined with valid records of the second
>> entity (site), so that only agent records with an existing site_id get to
>> proceed.
>>
>> The setup works fine with the local runner (even though the inputs are
>> not sorted). With the Beam-Direct runner, the fields are getting mixed up,
>> in this case the values from site_name end up in the site_id column.
>>
>> cheers
>>
>> Fabian
>>
>> 2022/09/02 12:35:33 - Hop - Pipeline opened.
>> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
>> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
>> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name
>> 'join-spike'
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge
>> join
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam
>> Pipeline Engine with run configuration 'Beam-Direct'
>> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
>> 2022/09/02 12:35:33 - join-spike - ERROR:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner
>> Direct
>> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error
>> executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.RuntimeException: Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 12:35:33 - join-spike -  ... 2 more
>> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException:
>> Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a
>> single threaded pipeline
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data
>> type error: the data type of java.lang.String object [San Jose] does
>> not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
>> 2022/09/02 12:35:33 - join-spike -  at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.hop.core.exception.HopValueException:
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data
>> type error: the data type of java.lang.String object [San Jose] does
>> not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
>> 2022/09/02 12:35:33 - join-spike -  ... 15 more
>>
>>
>>
>> Am 02.09.2022 um 09:48 schrieb Fabian Peters <po...@mercadu.de>:
>>
>> Hi Hans,
>>
>> That stack trace was due to user error and I'm glad it was. ;)
>> There was one "id" field to many coming into the merge join…
>>
>> Sorry for the noise!
>>
>> cheers
>>
>> Fabian
>>
>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <po...@mercadu.de>:
>>
>> Hi Hans,
>>
>> Thanks! I probably read that at some point, but the "Notice" modal
>> popping up when closing the "Merge join" dialogue probably convinced me
>> otherwise: "If the incoming data is not sorted on the specified keys, the
>> output results may not be correct. We recommend sorting the incoming data
>> within the pipeline."
>>
>> I'm testing it in my pipeline now and am getting a stack trace (see
>> below). The "site_id" field is from an "Avro decode" transform and is a
>> plain Integer. Using the local runner and writing to Postgres this works
>> fine.
>>
>> cheers
>>
>> Fabian
>>
>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
>> 2022/09/02 09:20:20 - General - ERROR:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner
>> Direct
>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error
>> converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 09:20:20 - General -  ... 2 more
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException:
>> Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General - Caused by:
>> org.apache.hop.core.exception.HopValueException:
>> 2022/09/02 09:20:20 - General - Unexpected conversion error while
>> converting value [site_id Integer] to an Integer
>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to
>> class java.lang.Long (java.lang.String and java.lang.Long are in
>> module java.base of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General -
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException:
>> class java.lang.String cannot be cast to class
>> java.lang.Long (java.lang.String and java.lang.Long are in module java.base
>> of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
>> 2022/09/02 09:20:20 - General -  ... 17 more
>>
>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <
>> hans.van.akelyen@gmail.com>:
>>
>> Hi Fabian,
>>
>> Merge join does not require your data to be sorted when executing on Beam
>>
>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>>
>> Chases,
>> Hans
>>
>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <po...@mercadu.de> wrote:
>>
>>> Good morning Matt,
>>>
>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so
>>> the Merge Join transform is not an option. I guess I'll have to use
>>> temporary BigQuery tables to handle this. Those pipelines are all bounded,
>>> so this is an option. Or is there an easy option to sort things when
>>> running on Beam?
>>>
>>> I'll create a Jira ticket, no problem.
>>>
>>> cheers
>>>
>>> Fabian
>>>
>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <ma...@neo4j.com>:
>>>
>>> Hi Fabian,
>>>
>>> Joining rows is indeed the exception in Beam.  I would suggest you use
>>> the Merge Join
>>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html>
>>> transforms.
>>> For unbounded pipelines (never ending) that transform will be handled
>>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>>> correctly.
>>> If you don't mind, please create a JIRA case so we can create a similar
>>> handler for the Cartesian product use-case.
>>> The code usually is non-trivial in the massive parallel world but quite
>>> doable ;-)
>>>
>>> All the best,
>>> Matt
>>>
>>>
>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <po...@mercadu.de> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've hit the next problem, this time something I thought I had testet
>>>> on Beam before: A pipeline containing a "Join rows (cartesian product)"
>>>> transform with input from two sources, loops forever when run via
>>>> Beam-Direct or Dataflow. It works fine using the local runner.
>>>>
>>>> While running it on Beam-Direct I've attached a debugger and can see
>>>> that it is stuck in the while loop at JoinRows.java:486
>>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>>> I've tried using a GCS temp directory and swapped the "Main transform to
>>>> read from" but none of those helped.
>>>>
>>>> Is this transform incompatible with Beam? If so, what could I use
>>>> instead?
>>>>
>>>> cheers
>>>>
>>>> Fabian
>>>>
>>>> <PastedGraphic-8.png>
>>>>
>>>
>>>
>>> --
>>> Neo4j Chief Solutions Architect
>>> *✉   *matt.casters@neo4j.com
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>
> --
> Neo4j Chief Solutions Architect
> *✉   *matt.casters@neo4j.com
>
>
>
>
>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.casters@neo4j.com

Re: Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Posted by Hans Van Akelyen <ha...@gmail.com>.
Hi All,
To come back to this thread this has been picked up by the following ticket
[1] and has been solved in what will be the 2.1 release.

Kind regards,
Hans

[1] https://issues.apache.org/jira/browse/HOP-3983


On Mon, 5 Sept 2022 at 12:04, <po...@gmx.com> wrote:

> I think Merge Join is not working with Beam. I spent many hours trying to
> make it work without success.
> For this reason I decided to go back to Kettle - will be back when it's
> solved.
>
> Regards,
>
> Mike
>
>
> *Sent:* Friday, September 02, 2022 at 3:14 PM
> *From:* "Fabian Peters" <po...@mercadu.de>
> *To:* users@hop.apache.org, "Matt Casters" <ma...@neo4j.com>
> *Subject:* Re: Merge join issue on Beam [was: Join rows transform loops
> forever on Beam]
> Hi Matt,
>
> Thanks for your quick reply!
>
> The "Write to log" transform actually works fine for me with the local,
> Beam-Direct and Dataflow runners.
>
> I removed it from my test case and replaced the last one with a Beam
> Output transform. The issue stays the same, see stack trace below…
>
> cheers
>
> Fabian
>
>
> 2022/09/02 15:05:06 - Hop - Pipeline opened.
> 2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 15:05:06 - Hop - Started the pipeline execution.
> 2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name
> 'join-spike'
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites
> 2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) :
> Select values, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid
> agents, gets data from Select values
> 2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam
> Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 15:05:14 - join-spike - ERROR:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner
> Direct
> 2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error
> converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 15:05:14 - join-spike - ... 2 more
> 2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException:
> Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 15:05:14 - join-spike - Error getting String from field site_id
> Integer on index 1 in input: [id Integer], [site_id Integer], [site_name
> String], [telephone String], [agent_name String], native value found:
> Mbabane
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [Mbabane] does not
> correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.hop.core.exception.HopValueException:
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [Mbabane] does not
> correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111)
> 2022/09/02 15:05:14 - join-spike - ... 13 more
>
>
>
> Am 02.09.2022 um 13:19 schrieb Matt Casters <ma...@neo4j.com>:
>
> Hi Fabian,
>
> If you remove the "Write to log" transforms the pipeline will work.  We
> typically don't use that transform as the log ends up on a Spark/Flink node
> somewhere where you can't see the information anyway.  It's the main reason
> why I'm working on HOP-4024.
>
> Also, just as a reference, there is an example in samples project under
> beam/pipelines called complex.hpl which contains a merge join.
>
> Best of luck!
> Matt
>
>
> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <po...@mercadu.de> wrote:
>
>> Hi once more,
>>
>> I feel a little bit like I've started my slow descent into madness. What
>> I supposed to be a configuration error now looks like it's not. I've put
>> together a minimal test case project
>> <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour
>> I'm seeing in my project. I'd be very grateful if someone could take a look
>> and tell me whether I'm taking a wrong turn somewhere or whether there
>> really is a bug.
>>
>> What I'm trying to achieve: There are records of one entity (agent) that
>> come with a site_id field. Some site_id values do not correspond to a valid
>> site. The agent records are to be joined with valid records of the second
>> entity (site), so that only agent records with an existing site_id get to
>> proceed.
>>
>> The setup works fine with the local runner (even though the inputs are
>> not sorted). With the Beam-Direct runner, the fields are getting mixed up,
>> in this case the values from site_name end up in the site_id column.
>>
>> cheers
>>
>> Fabian
>>
>> 2022/09/02 12:35:33 - Hop - Pipeline opened.
>> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
>> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
>> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name
>> 'join-spike'
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge
>> join
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam
>> Pipeline Engine with run configuration 'Beam-Direct'
>> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
>> 2022/09/02 12:35:33 - join-spike - ERROR:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner
>> Direct
>> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error
>> executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.RuntimeException: Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 12:35:33 - join-spike -  ... 2 more
>> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException:
>> Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a
>> single threaded pipeline
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data
>> type error: the data type of java.lang.String object [San Jose] does
>> not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
>> 2022/09/02 12:35:33 - join-spike -  at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.hop.core.exception.HopValueException:
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data
>> type error: the data type of java.lang.String object [San Jose] does
>> not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
>> 2022/09/02 12:35:33 - join-spike -  ... 15 more
>>
>>
>>
>>
>> Am 02.09.2022 um 09:48 schrieb Fabian Peters <po...@mercadu.de>:
>>
>> Hi Hans,
>>
>> That stack trace was due to user error and I'm glad it was. ;)
>> There was one "id" field to many coming into the merge join…
>>
>> Sorry for the noise!
>>
>> cheers
>>
>> Fabian
>>
>>
>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <po...@mercadu.de>:
>>
>> Hi Hans,
>>
>> Thanks! I probably read that at some point, but the "Notice" modal
>> popping up when closing the "Merge join" dialogue probably convinced me
>> otherwise: "If the incoming data is not sorted on the specified keys, the
>> output results may not be correct. We recommend sorting the incoming data
>> within the pipeline."
>>
>> I'm testing it in my pipeline now and am getting a stack trace (see
>> below). The "site_id" field is from an "Avro decode" transform and is a
>> plain Integer. Using the local runner and writing to Postgres this works
>> fine.
>>
>> cheers
>>
>> Fabian
>>
>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
>> 2022/09/02 09:20:20 - General - ERROR:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner
>> Direct
>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error
>> converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 09:20:20 - General -  ... 2 more
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException:
>> Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General - Caused by:
>> org.apache.hop.core.exception.HopValueException:
>> 2022/09/02 09:20:20 - General - Unexpected conversion error while
>> converting value [site_id Integer] to an Integer
>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to
>> class java.lang.Long (java.lang.String and java.lang.Long are in
>> module java.base of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General -
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException:
>> class java.lang.String cannot be cast to class
>> java.lang.Long (java.lang.String and java.lang.Long are in module java.base
>> of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
>> 2022/09/02 09:20:20 - General -  ... 17 more
>>
>>
>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <
>> hans.van.akelyen@gmail.com>:
>>
>> Hi Fabian,
>>
>> Merge join does not require your data to be sorted when executing on
>> Beam
>>
>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>>
>> Chases,
>> Hans
>>
>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <po...@mercadu.de> wrote:
>>
>>> Good morning Matt,
>>>
>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so
>>> the Merge Join transform is not an option. I guess I'll have to use
>>> temporary BigQuery tables to handle this. Those pipelines are all bounded,
>>> so this is an option. Or is there an easy option to sort things when
>>> running on Beam?
>>>
>>> I'll create a Jira ticket, no problem.
>>>
>>> cheers
>>>
>>> Fabian
>>>
>>>
>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <ma...@neo4j.com>:
>>>
>>> Hi Fabian,
>>>
>>> Joining rows is indeed the exception in Beam.  I would suggest you use
>>> the Merge Join
>>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html>
>>> transforms.
>>> For unbounded pipelines (never ending) that transform will be handled
>>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>>> correctly.
>>> If you don't mind, please create a JIRA case so we can create a similar
>>> handler for the Cartesian product use-case.
>>> The code usually is non-trivial in the massive parallel world but quite
>>> doable ;-)
>>>
>>> All the best,
>>> Matt
>>>
>>>
>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <po...@mercadu.de> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've hit the next problem, this time something I thought I had testet
>>>> on Beam before: A pipeline containing a "Join rows (cartesian product)"
>>>> transform with input from two sources, loops forever when run via
>>>> Beam-Direct or Dataflow. It works fine using the local runner.
>>>>
>>>> While running it on Beam-Direct I've attached a debugger and can see
>>>> that it is stuck in the while loop at JoinRows.java:486
>>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>>> I've tried using a GCS temp directory and swapped the "Main transform to
>>>> read from" but none of those helped.
>>>>
>>>> Is this transform incompatible with Beam? If so, what could I use
>>>> instead?
>>>>
>>>> cheers
>>>>
>>>> Fabian
>>>>
>>>> <PastedGraphic-8.png>
>>>>
>>>
>>>
>>> --
>>> Neo4j Chief Solutions Architect
>>> *✉   *matt.casters@neo4j.com
>>>
>>>
>>>
>>>
>>>
>
> --
> Neo4j Chief Solutions Architect
> *✉   *matt.casters@neo4j.com
>
>
>
>
>
>
>

Re: Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Posted by po...@gmx.com.
I think Merge Join is not working with Beam. I spent many hours trying to make
it work without success.

For this reason I decided to go back to Kettle - will be back when it's
solved.



Regards,



Mike





**Sent:**  Friday, September 02, 2022 at 3:14 PM  
**From:**  "Fabian Peters" <po...@mercadu.de>  
**To:**  users@hop.apache.org, "Matt Casters" <ma...@neo4j.com>  
**Subject:**  Re: Merge join issue on Beam [was: Join rows transform loops
forever on Beam]

Hi Matt,



Thanks for your quick reply!



The "Write to log" transform actually works fine for me with the local, Beam-
Direct and Dataflow runners.



I removed it from my test case and replaced the last one with a Beam Output
transform. The issue stays the same, see stack trace below…



cheers



Fabian





2022/09/02 15:05:06 - Hop - Pipeline opened.

2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]...

2022/09/02 15:05:06 - Hop - Started the pipeline execution.

2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name 'join-
spike'

2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents

2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites

2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join

2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) : Select
values, gets data from 1 previous transform(s), targets=0, infos=0

2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid
agents, gets data from Select values

2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam
Pipeline Engine with run configuration 'Beam-Direct'

2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline

2022/09/02 15:05:14 - join-spike - ERROR:
org.apache.hop.core.exception.HopException:

2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner Direct

2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error
converting Hop data to string lines

2022/09/02 15:05:14 - join-spike -

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)

2022/09/02 15:05:14 - join-spike -  at
java.base/java.lang.Thread.run(Thread.java:829)

2022/09/02 15:05:14 - join-spike - Caused by:
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.RuntimeException: Error converting Hop data to string lines

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)

2022/09/02 15:05:14 - join-spike -  ... 2 more

2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException:
Error converting Hop data to string lines

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148)

2022/09/02 15:05:14 - join-spike - Caused by:
org.apache.hop.core.exception.HopException:

2022/09/02 15:05:14 - join-spike - Error getting String from field site_id
Integer on index 1 in input: [id Integer], [site_id Integer], [site_name
String], [telephone String], [agent_name String], native value found: Mbabane

2022/09/02 15:05:14 - join-spike -

2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
error: the data type of java.lang.String object [Mbabane] does not correspond
to value meta [Integer]

2022/09/02 15:05:14 - join-spike -

2022/09/02 15:05:14 - join-spike -

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135)

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown
Source)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)

2022/09/02 15:05:14 - join-spike -  at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)

2022/09/02 15:05:14 - join-spike -  at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

2022/09/02 15:05:14 - join-spike -  at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

2022/09/02 15:05:14 - join-spike -  at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

2022/09/02 15:05:14 - join-spike -  at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

2022/09/02 15:05:14 - join-spike -  at
java.base/java.lang.Thread.run(Thread.java:829)

2022/09/02 15:05:14 - join-spike - Caused by:
org.apache.hop.core.exception.HopValueException:

2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
error: the data type of java.lang.String object [Mbabane] does not correspond
to value meta [Integer]

2022/09/02 15:05:14 - join-spike -

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)

2022/09/02 15:05:14 - join-spike -  at
org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111)

2022/09/02 15:05:14 - join-spike -  ... 13 more





> Am 02.09.2022 um 13:19 schrieb Matt Casters
> <[matt.casters@neo4j.com](mailto:matt.casters@neo4j.com)>:
>
>  
>
> Hi Fabian,
>
>  
>
> If you remove the "Write to log" transforms the pipeline will work.  We
> typically don't use that transform as the log ends up on a Spark/Flink node
> somewhere where you can't see the information anyway.  It's the main reason
> why I'm working on HOP-4024.
>
>  
>
> Also, just as a reference, there is an example in samples project under
> beam/pipelines called complex.hpl which contains a merge join.
>
>  
>
> Best of luck!  
>  Matt
>
>  
>
>  
>
> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters
> <[post@mercadu.de](mailto:post@mercadu.de)> wrote:
>

>> Hi once more,

>>

>>  
>>

>> I feel a little bit like I've started my slow descent into madness. What I
supposed to be a configuration error now looks like it's not. I've put
together a [minimal test case
project](https://github.com/fbarthez/BeamJoinSpike), showing the same
behaviour I'm seeing in my project. I'd be very grateful if someone could take
a look and tell me whether I'm taking a wrong turn somewhere or whether there
really is a bug.

>>

>>  
>>

>> What I'm trying to achieve: There are records of one entity (agent) that
come with a site_id field. Some site_id values do not correspond to a valid
site. The agent records are to be joined with valid records of the second
entity (site), so that only agent records with an existing site_id get to
proceed.

>>

>>  
>>

>> The setup works fine with the local runner (even though the inputs are not
sorted). With the Beam-Direct runner, the fields are getting mixed up, in this
case the values from site_name end up in the site_id column.

>>

>>  
>>

>> cheers

>>

>>  
>>

>> Fabian

>>

>>  
>>

>> 2022/09/02 12:35:33 - Hop - Pipeline opened.  
>  2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...  
>  2022/09/02 12:35:33 - Hop - Started the pipeline execution.  
>  2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name
> 'join-spike'  
>  2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites  
>  2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents  
>  2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log
> valid sites, gets data from 1 previous transform(s), targets=0, infos=0  
>  2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log
> all agents, gets data from 1 previous transform(s), targets=0, infos=0  
>  2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join  
>  2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log
> valid agents, gets data from 1 previous transform(s), targets=0, infos=0  
>  2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam
> Pipeline Engine with run configuration 'Beam-Direct'  
>  2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline  
>  2022/09/02 12:35:33 - join-spike - ERROR:
> org.apache.hop.core.exception.HopException:  
>  2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner
> Direct  
>  2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error
> executing TransformFn  
>  2022/09/02 12:35:33 - join-spike -  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)  
>  2022/09/02 12:35:33 - join-spike -  at
> java.base/java.lang.Thread.run(Thread.java:829)  
>  2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error executing TransformFn  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)  
>  2022/09/02 12:35:33 - join-spike -  ... 2 more  
>  2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException:
> Error executing TransformFn  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)  
>  2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.hop.core.exception.HopException:  
>  2022/09/02 12:35:33 - join-spike - Error performing an iteration in a
> single threaded pipeline  
>  2022/09/02 12:35:33 - join-spike -  
>  2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [San Jose] does not
> correspond to value meta [Integer]  
>  2022/09/02 12:35:33 - join-spike -  
>  2022/09/02 12:35:33 - join-spike -  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)  
>  2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)  
>  2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)  
>  2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)  
>  2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)  
>  2022/09/02 12:35:33 - join-spike -  at
> java.base/java.lang.Thread.run(Thread.java:829)  
>  2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.hop.core.exception.HopValueException:  
>  2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [San Jose] does not
> correspond to value meta [Integer]  
>  2022/09/02 12:35:33 - join-spike -  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)  
>  2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)  
>  2022/09/02 12:35:33 - join-spike -  ... 15 more  
>  
>>

>>  
>>

>>  
>>

>>> Am 02.09.2022 um 09:48 schrieb Fabian Peters
<[post@mercadu.de](mailto:post@mercadu.de)>:

>>>

>>>  
>>>

>>> Hi Hans,

>>>

>>>  
>>>

>>> That stack trace was due to user error and I'm glad it was. ;)

>>>

>>> There was one "id" field to many coming into the merge join…

>>>

>>>  
>>>

>>> Sorry for the noise!

>>>

>>>  
>>>

>>> cheers

>>>

>>>  
>>>

>>> Fabian

>>>

>>>  
>>>

>>>> Am 02.09.2022 um 09:31 schrieb Fabian Peters
<[post@mercadu.de](mailto:post@mercadu.de)>:

>>>>

>>>>  
>>>>

>>>> Hi Hans,

>>>>

>>>>  
>>>>

>>>> Thanks! I probably read that at some point, but the "Notice" modal
popping up when closing the "Merge join" dialogue probably convinced me
otherwise: "If the incoming data is not sorted on the specified keys, the
output results may not be correct. We recommend sorting the incoming data
within the pipeline."

>>>>

>>>>  
>>>>

>>>> I'm testing it in my pipeline now and am getting a stack trace (see
below). The "site_id" field is from an "Avro decode" transform and is a plain
Integer. Using the local runner and writing to Postgres this works fine.

>>>>

>>>>  
>>>>

>>>> cheers

>>>>

>>>>  
>>>>

>>>> Fabian

>>>>

>>>>  
>>>>

>>>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline  
>  2022/09/02 09:20:20 - General - ERROR:
> org.apache.hop.core.exception.HopException:  
>  2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct  
>  2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error
> converting HopRow to BigQuery TableRow  
>  2022/09/02 09:20:20 - General -  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)  
>  2022/09/02 09:20:20 - General -  at
> java.base/java.lang.Thread.run(Thread.java:829)  
>  2022/09/02 09:20:20 - General - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)  
>  2022/09/02 09:20:20 - General -  ... 2 more  
>  2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException:
> Error converting HopRow to BigQuery TableRow  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)  
>  2022/09/02 09:20:20 - General - Caused by:
> org.apache.hop.core.exception.HopValueException:  
>  2022/09/02 09:20:20 - General - Unexpected conversion error while
> converting value [site_id Integer] to an Integer  
>  2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to
> class java.lang.Long (java.lang.String and java.lang.Long are in module
> java.base of loader 'bootstrap')  
>  2022/09/02 09:20:20 - General -  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
> Source)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)  
>  2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)  
>  2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)  
>  2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)  
>  2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)  
>  2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)  
>  2022/09/02 09:20:20 - General -  at
> java.base/java.lang.Thread.run(Thread.java:829)  
>  2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException:
> class java.lang.String cannot be cast to class java.lang.Long
> (java.lang.String and java.lang.Long are in module java.base of loader
> 'bootstrap')  
>  2022/09/02 09:20:20 - General -  at
> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)  
>  2022/09/02 09:20:20 - General -  ... 17 more
>>>>

>>>>  
>>>>

>>>>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen
<[hans.van.akelyen@gmail.com](mailto:hans.van.akelyen@gmail.com)>:

>>>>>

>>>>>  
>>>>>

>>>>> Hi Fabian,

>>>>>

>>>>>  
>>>>>

>>>>> Merge join does not require your data to be sorted when executing on
Beam

>>>>>

>>>>> <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-
with-beam.html#_universal_transforms>

>>>>>

>>>>>  
>>>>>

>>>>> Chases,

>>>>>

>>>>> Hans

>>>>>

>>>>>  
>>>>>

>>>>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters
<[post@mercadu.de](mailto:post@mercadu.de)> wrote:

>>>>>

>>>>>> Good morning Matt,

>>>>>>

>>>>>>  
>>>>>>

>>>>>> Thanks for your quick reply! Unfortunately the inputs are not sorted,
so the Merge Join transform is not an option. I guess I'll have to use
temporary BigQuery tables to handle this. Those pipelines are all bounded, so
this is an option. Or is there an easy option to sort things when running on
Beam?

>>>>>>

>>>>>>  
>>>>>>

>>>>>> I'll create a Jira ticket, no problem.

>>>>>>

>>>>>>  
>>>>>>

>>>>>> cheers

>>>>>>

>>>>>>  
>>>>>>

>>>>>> Fabian

>>>>>>

>>>>>>  
>>>>>>

>>>>>>> Am 01.09.2022 um 19:11 schrieb Matt Casters
<[matt.casters@neo4j.com](mailto:matt.casters@neo4j.com)>:

>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>> Hi Fabian,

>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>> Joining rows is indeed the exception in Beam.  I would suggest you use
the [Merge
Join](https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html)
transforms.

>>>>>>>

>>>>>>> For unbounded pipelines (never ending) that transform will be
[handled](https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java)
correctly.

>>>>>>>

>>>>>>> If you don't mind, please create a JIRA case so we can create a
similar handler for the Cartesian product use-case.

>>>>>>>

>>>>>>> The code usually is non-trivial in the massive parallel world but
quite doable ;-)

>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>> All the best,  
>  Matt
>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters
<[post@mercadu.de](mailto:post@mercadu.de)> wrote:

>>>>>>>

>>>>>>>> Hi all,

>>>>>>>>

>>>>>>>>  
>>>>>>>>

>>>>>>>> I've hit the next problem, this time something I thought I had testet
on Beam before: A pipeline containing a "Join rows (cartesian product)"
transform with input from two sources, loops forever when run via Beam-Direct
or Dataflow. It works fine using the local runner.

>>>>>>>>

>>>>>>>>  
>>>>>>>>

>>>>>>>> While running it on Beam-Direct I've attached a debugger and can see
that it is stuck in the while loop at
[JoinRows.java:486](https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487).
I've tried using a GCS temp directory and swapped the "Main transform to read
from" but none of those helped.

>>>>>>>>

>>>>>>>>  
>>>>>>>>

>>>>>>>> Is this transform incompatible with Beam? If so, what could I use
instead?

>>>>>>>>

>>>>>>>>  
>>>>>>>>

>>>>>>>> cheers

>>>>>>>>

>>>>>>>>  
>>>>>>>>

>>>>>>>> Fabian

>>>>>>>>

>>>>>>>>  
>>>>>>>>

>>>>>>>> <PastedGraphic-8.png>

>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>> \--

>>>>>>>

>>>>>>> Neo4j Chief Solutions Architect

>>>>>>>

>>>>>>> **✉    **[matt.casters@neo4j.com](mailto:matt.casters@neo4j.com)

>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>>  
>>>>>>>

>>>>>>>  
>
>  
>
>  
>
> \--
>
> Neo4j Chief Solutions Architect
>
> **✉    **[matt.casters@neo4j.com](mailto:matt.casters@neo4j.com)
>
>  
>
>  
>
>  






Re: Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Posted by Matt Casters <ma...@neo4j.com>.
Could you create a JIRA case for this?  Bug reports are really more at home
there.
I do know that the Beam Join operator behaves a bit different so there's
probably an edge case triggered somewhere.
In principle this is a 5 minute thing to look at and fix but I'm struggling
with a bit of covid and I just can't look at it right now.

All the best,
Matt

On Fri, Sep 2, 2022 at 3:14 PM Fabian Peters <po...@mercadu.de> wrote:

> Hi Matt,
>
> Thanks for your quick reply!
>
> The "Write to log" transform actually works fine for me with the local,
> Beam-Direct and Dataflow runners.
>
> I removed it from my test case and replaced the last one with a Beam
> Output transform. The issue stays the same, see stack trace below…
>
> cheers
>
> Fabian
>
>
> 2022/09/02 15:05:06 - Hop - Pipeline opened.
> 2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 15:05:06 - Hop - Started the pipeline execution.
> 2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name
> 'join-spike'
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites
> 2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) :
> Select values, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid
> agents, gets data from Select values
> 2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam
> Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 15:05:14 - join-spike - ERROR:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner
> Direct
> 2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error
> converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 15:05:14 - join-spike - ... 2 more
> 2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException:
> Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 15:05:14 - join-spike - Error getting String from field site_id
> Integer on index 1 in input: [id Integer], [site_id Integer], [site_name
> String], [telephone String], [agent_name String], native value found:
> Mbabane
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [Mbabane] does not
> correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.hop.core.exception.HopValueException:
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [Mbabane] does not
> correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111)
> 2022/09/02 15:05:14 - join-spike - ... 13 more
>
>
> Am 02.09.2022 um 13:19 schrieb Matt Casters <ma...@neo4j.com>:
>
> Hi Fabian,
>
> If you remove the "Write to log" transforms the pipeline will work.  We
> typically don't use that transform as the log ends up on a Spark/Flink node
> somewhere where you can't see the information anyway.  It's the main reason
> why I'm working on HOP-4024.
>
> Also, just as a reference, there is an example in samples project under
> beam/pipelines called complex.hpl which contains a merge join.
>
> Best of luck!
> Matt
>
>
> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <po...@mercadu.de> wrote:
>
>> Hi once more,
>>
>> I feel a little bit like I've started my slow descent into madness. What
>> I supposed to be a configuration error now looks like it's not. I've put
>> together a minimal test case project
>> <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour
>> I'm seeing in my project. I'd be very grateful if someone could take a look
>> and tell me whether I'm taking a wrong turn somewhere or whether there
>> really is a bug.
>>
>> What I'm trying to achieve: There are records of one entity (agent) that
>> come with a site_id field. Some site_id values do not correspond to a valid
>> site. The agent records are to be joined with valid records of the second
>> entity (site), so that only agent records with an existing site_id get to
>> proceed.
>>
>> The setup works fine with the local runner (even though the inputs are
>> not sorted). With the Beam-Direct runner, the fields are getting mixed up,
>> in this case the values from site_name end up in the site_id column.
>>
>> cheers
>>
>> Fabian
>>
>> 2022/09/02 12:35:33 - Hop - Pipeline opened.
>> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
>> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
>> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name
>> 'join-spike'
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge
>> join
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam
>> Pipeline Engine with run configuration 'Beam-Direct'
>> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
>> 2022/09/02 12:35:33 - join-spike - ERROR:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner
>> Direct
>> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error
>> executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.RuntimeException: Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 12:35:33 - join-spike -  ... 2 more
>> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException:
>> Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a
>> single threaded pipeline
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data
>> type error: the data type of java.lang.String object [San Jose] does
>> not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
>> 2022/09/02 12:35:33 - join-spike -  at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.hop.core.exception.HopValueException:
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data
>> type error: the data type of java.lang.String object [San Jose] does
>> not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
>> 2022/09/02 12:35:33 - join-spike -
>> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
>> 2022/09/02 12:35:33 - join-spike -  ... 15 more
>>
>>
>>
>> Am 02.09.2022 um 09:48 schrieb Fabian Peters <po...@mercadu.de>:
>>
>> Hi Hans,
>>
>> That stack trace was due to user error and I'm glad it was. ;)
>> There was one "id" field to many coming into the merge join…
>>
>> Sorry for the noise!
>>
>> cheers
>>
>> Fabian
>>
>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <po...@mercadu.de>:
>>
>> Hi Hans,
>>
>> Thanks! I probably read that at some point, but the "Notice" modal
>> popping up when closing the "Merge join" dialogue probably convinced me
>> otherwise: "If the incoming data is not sorted on the specified keys, the
>> output results may not be correct. We recommend sorting the incoming data
>> within the pipeline."
>>
>> I'm testing it in my pipeline now and am getting a stack trace (see
>> below). The "site_id" field is from an "Avro decode" transform and is a
>> plain Integer. Using the local runner and writing to Postgres this works
>> fine.
>>
>> cheers
>>
>> Fabian
>>
>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
>> 2022/09/02 09:20:20 - General - ERROR:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner
>> Direct
>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error
>> converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 09:20:20 - General -  ... 2 more
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException:
>> Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General - Caused by:
>> org.apache.hop.core.exception.HopValueException:
>> 2022/09/02 09:20:20 - General - Unexpected conversion error while
>> converting value [site_id Integer] to an Integer
>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to
>> class java.lang.Long (java.lang.String and java.lang.Long are in
>> module java.base of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General -
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 09:20:20 - General -
>> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException:
>> class java.lang.String cannot be cast to class
>> java.lang.Long (java.lang.String and java.lang.Long are in module java.base
>> of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
>> 2022/09/02 09:20:20 - General -  ... 17 more
>>
>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <
>> hans.van.akelyen@gmail.com>:
>>
>> Hi Fabian,
>>
>> Merge join does not require your data to be sorted when executing on Beam
>>
>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>>
>> Chases,
>> Hans
>>
>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <po...@mercadu.de> wrote:
>>
>>> Good morning Matt,
>>>
>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so
>>> the Merge Join transform is not an option. I guess I'll have to use
>>> temporary BigQuery tables to handle this. Those pipelines are all bounded,
>>> so this is an option. Or is there an easy option to sort things when
>>> running on Beam?
>>>
>>> I'll create a Jira ticket, no problem.
>>>
>>> cheers
>>>
>>> Fabian
>>>
>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <ma...@neo4j.com>:
>>>
>>> Hi Fabian,
>>>
>>> Joining rows is indeed the exception in Beam.  I would suggest you use
>>> the Merge Join
>>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html>
>>> transforms.
>>> For unbounded pipelines (never ending) that transform will be handled
>>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>>> correctly.
>>> If you don't mind, please create a JIRA case so we can create a similar
>>> handler for the Cartesian product use-case.
>>> The code usually is non-trivial in the massive parallel world but quite
>>> doable ;-)
>>>
>>> All the best,
>>> Matt
>>>
>>>
>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <po...@mercadu.de> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've hit the next problem, this time something I thought I had testet
>>>> on Beam before: A pipeline containing a "Join rows (cartesian product)"
>>>> transform with input from two sources, loops forever when run via
>>>> Beam-Direct or Dataflow. It works fine using the local runner.
>>>>
>>>> While running it on Beam-Direct I've attached a debugger and can see
>>>> that it is stuck in the while loop at JoinRows.java:486
>>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>>> I've tried using a GCS temp directory and swapped the "Main transform to
>>>> read from" but none of those helped.
>>>>
>>>> Is this transform incompatible with Beam? If so, what could I use
>>>> instead?
>>>>
>>>> cheers
>>>>
>>>> Fabian
>>>>
>>>> <PastedGraphic-8.png>
>>>>
>>>
>>>
>>> --
>>> Neo4j Chief Solutions Architect
>>> *✉   *matt.casters@neo4j.com
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>
> --
> Neo4j Chief Solutions Architect
> *✉   *matt.casters@neo4j.com
>
>
>
>
>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.casters@neo4j.com

Re: Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Posted by Fabian Peters <po...@mercadu.de>.
Hi Matt,

Thanks for your quick reply!

The "Write to log" transform actually works fine for me with the local, Beam-Direct and Dataflow runners.

I removed it from my test case and replaced the last one with a Beam Output transform. The issue stays the same, see stack trace below…

cheers

Fabian


2022/09/02 15:05:06 - Hop - Pipeline opened.
2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]...
2022/09/02 15:05:06 - Hop - Started the pipeline execution.
2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name 'join-spike'
2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents
2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites
2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join
2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) : Select values, gets data from 1 previous transform(s), targets=0, infos=0
2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid agents, gets data from Select values
2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Beam-Direct'
2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline
2022/09/02 15:05:14 - join-spike - ERROR: org.apache.hop.core.exception.HopException: 
2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner Direct
2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error converting Hop data to string lines
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
2022/09/02 15:05:14 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
2022/09/02 15:05:14 - join-spike - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error converting Hop data to string lines
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
2022/09/02 15:05:14 - join-spike - 	... 2 more
2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException: Error converting Hop data to string lines
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148)
2022/09/02 15:05:14 - join-spike - Caused by: org.apache.hop.core.exception.HopException: 
2022/09/02 15:05:14 - join-spike - Error getting String from field site_id Integer on index 1 in input: [id Integer], [site_id Integer], [site_name String], [telephone String], [agent_name String], native value found: Mbabane
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [Mbabane] does not correspond to value meta [Integer]
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135)
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown Source)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
2022/09/02 15:05:14 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
2022/09/02 15:05:14 - join-spike - 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022/09/02 15:05:14 - join-spike - 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022/09/02 15:05:14 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022/09/02 15:05:14 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022/09/02 15:05:14 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
2022/09/02 15:05:14 - join-spike - Caused by: org.apache.hop.core.exception.HopValueException: 
2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [Mbabane] does not correspond to value meta [Integer]
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
2022/09/02 15:05:14 - join-spike - 	at org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111)
2022/09/02 15:05:14 - join-spike - 	... 13 more


> Am 02.09.2022 um 13:19 schrieb Matt Casters <ma...@neo4j.com>:
> 
> Hi Fabian,
> 
> If you remove the "Write to log" transforms the pipeline will work.  We typically don't use that transform as the log ends up on a Spark/Flink node somewhere where you can't see the information anyway.  It's the main reason why I'm working on HOP-4024.
> 
> Also, just as a reference, there is an example in samples project under beam/pipelines called complex.hpl which contains a merge join.
> 
> Best of luck!
> Matt
> 
> 
> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
> Hi once more,
> 
> I feel a little bit like I've started my slow descent into madness. What I supposed to be a configuration error now looks like it's not. I've put together a minimal test case project <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour I'm seeing in my project. I'd be very grateful if someone could take a look and tell me whether I'm taking a wrong turn somewhere or whether there really is a bug.
> 
> What I'm trying to achieve: There are records of one entity (agent) that come with a site_id field. Some site_id values do not correspond to a valid site. The agent records are to be joined with valid records of the second entity (site), so that only agent records with an existing site_id get to proceed.
> 
> The setup works fine with the local runner (even though the inputs are not sorted). With the Beam-Direct runner, the fields are getting mixed up, in this case the values from site_name end up in the site_id column.
> 
> cheers
> 
> Fabian
> 
> 2022/09/02 12:35:33 - Hop - Pipeline opened.
> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name 'join-spike'
> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 12:35:33 - join-spike - ERROR: org.apache.hop.core.exception.HopException: 
> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner Direct
> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 12:35:33 - join-spike - 	... 2 more
> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException: Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
> 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.hop.core.exception.HopException: 
> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a single threaded pipeline
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [San Jose] does not correspond to value meta [Integer]
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown Source)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 12:35:33 - join-spike - 	at java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.hop.core.exception.HopValueException: 
> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [San Jose] does not correspond to value meta [Integer]
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
> 2022/09/02 12:35:33 - join-spike - 	at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
> 2022/09/02 12:35:33 - join-spike - 	... 15 more
> 
> 
> 
>> Am 02.09.2022 um 09:48 schrieb Fabian Peters <post@mercadu.de <ma...@mercadu.de>>:
>> 
>> Hi Hans,
>> 
>> That stack trace was due to user error and I'm glad it was. ;)
>> There was one "id" field to many coming into the merge join…
>> 
>> Sorry for the noise!
>> 
>> cheers
>> 
>> Fabian
>> 
>>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <post@mercadu.de <ma...@mercadu.de>>:
>>> 
>>> Hi Hans,
>>> 
>>> Thanks! I probably read that at some point, but the "Notice" modal popping up when closing the "Merge join" dialogue probably convinced me otherwise: "If the incoming data is not sorted on the specified keys, the output results may not be correct. We recommend sorting the incoming data within the pipeline."
>>> 
>>> I'm testing it in my pipeline now and am getting a stack trace (see below). The "site_id" field is from an "Avro decode" transform and is a plain Integer. Using the local runner and writing to Postgres this works fine.
>>> 
>>> cheers
>>> 
>>> Fabian
>>> 
>>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
>>> 2022/09/02 09:20:20 - General - ERROR: org.apache.hop.core.exception.HopException: 
>>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct
>>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>>> 2022/09/02 09:20:20 - General - 
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>>> 2022/09/02 09:20:20 - General - 	at java.base/java.lang.Thread.run(Thread.java:829)
>>> 2022/09/02 09:20:20 - General - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>>> 2022/09/02 09:20:20 - General - 	... 2 more
>>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>>> 2022/09/02 09:20:20 - General - Caused by: org.apache.hop.core.exception.HopValueException: 
>>> 2022/09/02 09:20:20 - General - Unexpected conversion error while converting value [site_id Integer] to an Integer
>>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to class java.lang.Long (java.lang.String and java.lang.Long are in module java.base of loader 'bootstrap')
>>> 2022/09/02 09:20:20 - General - 
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>>> 2022/09/02 09:20:20 - General - 	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> 2022/09/02 09:20:20 - General - 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> 2022/09/02 09:20:20 - General - 	at java.base/java.lang.Thread.run(Thread.java:829)
>>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Long (java.lang.String and java.lang.Long are in module java.base of loader 'bootstrap')
>>> 2022/09/02 09:20:20 - General - 	at org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
>>> 2022/09/02 09:20:20 - General - 	... 17 more
>>> 
>>>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <hans.van.akelyen@gmail.com <ma...@gmail.com>>:
>>>> 
>>>> Hi Fabian,
>>>> 
>>>> Merge join does not require your data to be sorted when executing on Beam 
>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms>
>>>> 
>>>> Chases,
>>>> Hans
>>>> 
>>>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
>>>> Good morning Matt,
>>>> 
>>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so the Merge Join transform is not an option. I guess I'll have to use temporary BigQuery tables to handle this. Those pipelines are all bounded, so this is an option. Or is there an easy option to sort things when running on Beam?
>>>> 
>>>> I'll create a Jira ticket, no problem.
>>>> 
>>>> cheers
>>>> 
>>>> Fabian
>>>> 
>>>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <matt.casters@neo4j.com <ma...@neo4j.com>>:
>>>>> 
>>>>> Hi Fabian,
>>>>> 
>>>>> Joining rows is indeed the exception in Beam.  I would suggest you use the Merge Join <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> transforms.
>>>>> For unbounded pipelines (never ending) that transform will be handled <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java> correctly. 
>>>>> If you don't mind, please create a JIRA case so we can create a similar handler for the Cartesian product use-case.
>>>>> The code usually is non-trivial in the massive parallel world but quite doable ;-)
>>>>> 
>>>>> All the best,
>>>>> Matt
>>>>> 
>>>>> 
>>>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <post@mercadu.de <ma...@mercadu.de>> wrote:
>>>>> Hi all,
>>>>> 
>>>>> I've hit the next problem, this time something I thought I had testet on Beam before: A pipeline containing a "Join rows (cartesian product)" transform with input from two sources, loops forever when run via Beam-Direct or Dataflow. It works fine using the local runner.
>>>>> 
>>>>> While running it on Beam-Direct I've attached a debugger and can see that it is stuck in the while loop at JoinRows.java:486 <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>. I've tried using a GCS temp directory and swapped the "Main transform to read from" but none of those helped.
>>>>> 
>>>>> Is this transform incompatible with Beam? If so, what could I use instead?
>>>>> 
>>>>> cheers
>>>>> 
>>>>> Fabian
>>>>> 
>>>>> <PastedGraphic-8.png>
>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Neo4j Chief Solutions Architect
>>>>> ✉   matt.casters@neo4j.com <ma...@neo4j.com>
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 
> 
> 
> -- 
> Neo4j Chief Solutions Architect
> ✉   matt.casters@neo4j.com <ma...@neo4j.com>
> 
> 
> 


Re: Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Posted by Matt Casters <ma...@neo4j.com>.
Hi Fabian,

If you remove the "Write to log" transforms the pipeline will work.  We
typically don't use that transform as the log ends up on a Spark/Flink node
somewhere where you can't see the information anyway.  It's the main reason
why I'm working on HOP-4024.

Also, just as a reference, there is an example in samples project under
beam/pipelines called complex.hpl which contains a merge join.

Best of luck!
Matt


On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <po...@mercadu.de> wrote:

> Hi once more,
>
> I feel a little bit like I've started my slow descent into madness. What I
> supposed to be a configuration error now looks like it's not. I've put
> together a minimal test case project
> <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour
> I'm seeing in my project. I'd be very grateful if someone could take a look
> and tell me whether I'm taking a wrong turn somewhere or whether there
> really is a bug.
>
> What I'm trying to achieve: There are records of one entity (agent) that
> come with a site_id field. Some site_id values do not correspond to a valid
> site. The agent records are to be joined with valid records of the second
> entity (site), so that only agent records with an existing site_id get to
> proceed.
>
> The setup works fine with the local runner (even though the inputs are not
> sorted). With the Beam-Direct runner, the fields are getting mixed up, in
> this case the values from site_name end up in the site_id column.
>
> cheers
>
> Fabian
>
> 2022/09/02 12:35:33 - Hop - Pipeline opened.
> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name
> 'join-spike'
> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
> Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
> Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
> Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam
> Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 12:35:33 - join-spike - ERROR:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner
> Direct
> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error
> executing TransformFn
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 12:35:33 - join-spike -  ... 2 more
> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException:
> Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
> 2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a
> single threaded pipeline
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [San Jose] does
> not correspond to value meta [Integer]
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
> 2022/09/02 12:35:33 - join-spike -  at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.hop.core.exception.HopValueException:
> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [San Jose] does
> not correspond to value meta [Integer]
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
> 2022/09/02 12:35:33 - join-spike -  ... 15 more
>
>
>
> Am 02.09.2022 um 09:48 schrieb Fabian Peters <po...@mercadu.de>:
>
> Hi Hans,
>
> That stack trace was due to user error and I'm glad it was. ;)
> There was one "id" field to many coming into the merge join…
>
> Sorry for the noise!
>
> cheers
>
> Fabian
>
> Am 02.09.2022 um 09:31 schrieb Fabian Peters <po...@mercadu.de>:
>
> Hi Hans,
>
> Thanks! I probably read that at some point, but the "Notice" modal popping
> up when closing the "Merge join" dialogue probably convinced me otherwise:
> "If the incoming data is not sorted on the specified keys, the output
> results may not be correct. We recommend sorting the incoming data within
> the pipeline."
>
> I'm testing it in my pipeline now and am getting a stack trace (see
> below). The "site_id" field is from an "Avro decode" transform and is a
> plain Integer. Using the local runner and writing to Postgres this works
> fine.
>
> cheers
>
> Fabian
>
> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
> 2022/09/02 09:20:20 - General - ERROR:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct
> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error
> converting HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General -
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 09:20:20 - General - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 09:20:20 - General -  ... 2 more
> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException:
> Error converting HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
> 2022/09/02 09:20:20 - General - Caused by:
> org.apache.hop.core.exception.HopValueException:
> 2022/09/02 09:20:20 - General - Unexpected conversion error while
> converting value [site_id Integer] to an Integer
> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to
> class java.lang.Long (java.lang.String and java.lang.Long are in
> module java.base of loader 'bootstrap')
> 2022/09/02 09:20:20 - General -
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> 2022/09/02 09:20:20 - General -
> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 09:20:20 - General -
> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
> 2022/09/02 09:20:20 - General -
> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 09:20:20 - General -
> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException:
> class java.lang.String cannot be cast to class
> java.lang.Long (java.lang.String and java.lang.Long are in module java.base
> of loader 'bootstrap')
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
> 2022/09/02 09:20:20 - General -  ... 17 more
>
> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <
> hans.van.akelyen@gmail.com>:
>
> Hi Fabian,
>
> Merge join does not require your data to be sorted when executing on Beam
>
> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>
> Chases,
> Hans
>
> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <po...@mercadu.de> wrote:
>
>> Good morning Matt,
>>
>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so
>> the Merge Join transform is not an option. I guess I'll have to use
>> temporary BigQuery tables to handle this. Those pipelines are all bounded,
>> so this is an option. Or is there an easy option to sort things when
>> running on Beam?
>>
>> I'll create a Jira ticket, no problem.
>>
>> cheers
>>
>> Fabian
>>
>> Am 01.09.2022 um 19:11 schrieb Matt Casters <ma...@neo4j.com>:
>>
>> Hi Fabian,
>>
>> Joining rows is indeed the exception in Beam.  I would suggest you use
>> the Merge Join
>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html>
>> transforms.
>> For unbounded pipelines (never ending) that transform will be handled
>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>> correctly.
>> If you don't mind, please create a JIRA case so we can create a similar
>> handler for the Cartesian product use-case.
>> The code usually is non-trivial in the massive parallel world but quite
>> doable ;-)
>>
>> All the best,
>> Matt
>>
>>
>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <po...@mercadu.de> wrote:
>>
>>> Hi all,
>>>
>>> I've hit the next problem, this time something I thought I had testet on
>>> Beam before: A pipeline containing a "Join rows (cartesian product)"
>>> transform with input from two sources, loops forever when run via
>>> Beam-Direct or Dataflow. It works fine using the local runner.
>>>
>>> While running it on Beam-Direct I've attached a debugger and can see
>>> that it is stuck in the while loop at JoinRows.java:486
>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>> I've tried using a GCS temp directory and swapped the "Main transform to
>>> read from" but none of those helped.
>>>
>>> Is this transform incompatible with Beam? If so, what could I use
>>> instead?
>>>
>>> cheers
>>>
>>> Fabian
>>>
>>> <PastedGraphic-8.png>
>>>
>>
>>
>> --
>> Neo4j Chief Solutions Architect
>> *✉   *matt.casters@neo4j.com
>>
>>
>>
>>
>>
>
>
>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.casters@neo4j.com

Re: Merge join issue on Beam [was: Join rows transform loops forever on Beam]

Posted by Matt Casters <ma...@neo4j.com>.
Hi Fabian,

If you remove the "Write to log" transforms the pipeline will work.  We
typically don't use that transform as the log ends up on a Spark/Flink node
somewhere where you can't see the information anyway.  It's the main reason
why I'm working on HOP-4024.

Also, just as a reference, there is an example in samples project under
beam/pipelines called complex.hpl which contains a merge join.

Best of luck!
Matt


On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <po...@mercadu.de> wrote:

> Hi once more,
>
> I feel a little bit like I've started my slow descent into madness. What I
> supposed to be a configuration error now looks like it's not. I've put
> together a minimal test case project
> <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour
> I'm seeing in my project. I'd be very grateful if someone could take a look
> and tell me whether I'm taking a wrong turn somewhere or whether there
> really is a bug.
>
> What I'm trying to achieve: There are records of one entity (agent) that
> come with a site_id field. Some site_id values do not correspond to a valid
> site. The agent records are to be joined with valid records of the second
> entity (site), so that only agent records with an existing site_id get to
> proceed.
>
> The setup works fine with the local runner (even though the inputs are not
> sorted). With the Beam-Direct runner, the fields are getting mixed up, in
> this case the values from site_name end up in the site_id column.
>
> cheers
>
> Fabian
>
> 2022/09/02 12:35:33 - Hop - Pipeline opened.
> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name
> 'join-spike'
> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
> Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
> Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
> Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam
> Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 12:35:33 - join-spike - ERROR:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner
> Direct
> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error
> executing TransformFn
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 12:35:33 - join-spike -  ... 2 more
> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException:
> Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
> 2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a
> single threaded pipeline
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [San Jose] does
> not correspond to value meta [Integer]
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
> 2022/09/02 12:35:33 - join-spike -  at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 12:35:33 - join-spike -  at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 12:35:33 - join-spike - Caused by:
> org.apache.hop.core.exception.HopValueException:
> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [San Jose] does
> not correspond to value meta [Integer]
> 2022/09/02 12:35:33 - join-spike -
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 12:35:33 - join-spike -  at
> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
> 2022/09/02 12:35:33 - join-spike -
> at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
> 2022/09/02 12:35:33 - join-spike -  ... 15 more
>
>
>
> Am 02.09.2022 um 09:48 schrieb Fabian Peters <po...@mercadu.de>:
>
> Hi Hans,
>
> That stack trace was due to user error and I'm glad it was. ;)
> There was one "id" field to many coming into the merge join…
>
> Sorry for the noise!
>
> cheers
>
> Fabian
>
> Am 02.09.2022 um 09:31 schrieb Fabian Peters <po...@mercadu.de>:
>
> Hi Hans,
>
> Thanks! I probably read that at some point, but the "Notice" modal popping
> up when closing the "Merge join" dialogue probably convinced me otherwise:
> "If the incoming data is not sorted on the specified keys, the output
> results may not be correct. We recommend sorting the incoming data within
> the pipeline."
>
> I'm testing it in my pipeline now and am getting a stack trace (see
> below). The "site_id" field is from an "Avro decode" transform and is a
> plain Integer. Using the local runner and writing to Postgres this works
> fine.
>
> cheers
>
> Fabian
>
> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
> 2022/09/02 09:20:20 - General - ERROR:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct
> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error
> converting HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General -
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 09:20:20 - General - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 09:20:20 - General -  ... 2 more
> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException:
> Error converting HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
> 2022/09/02 09:20:20 - General - Caused by:
> org.apache.hop.core.exception.HopValueException:
> 2022/09/02 09:20:20 - General - Unexpected conversion error while
> converting value [site_id Integer] to an Integer
> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to
> class java.lang.Long (java.lang.String and java.lang.Long are in
> module java.base of loader 'bootstrap')
> 2022/09/02 09:20:20 - General -
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> 2022/09/02 09:20:20 - General -
> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 09:20:20 - General -
> at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
> 2022/09/02 09:20:20 - General -
> at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 09:20:20 - General -
> at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 09:20:20 - General -  at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 09:20:20 - General -  at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException:
> class java.lang.String cannot be cast to class
> java.lang.Long (java.lang.String and java.lang.Long are in module java.base
> of loader 'bootstrap')
> 2022/09/02 09:20:20 - General -  at
> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
> 2022/09/02 09:20:20 - General -  ... 17 more
>
> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <
> hans.van.akelyen@gmail.com>:
>
> Hi Fabian,
>
> Merge join does not require your data to be sorted when executing on Beam
>
> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>
> Chases,
> Hans
>
> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <po...@mercadu.de> wrote:
>
>> Good morning Matt,
>>
>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so
>> the Merge Join transform is not an option. I guess I'll have to use
>> temporary BigQuery tables to handle this. Those pipelines are all bounded,
>> so this is an option. Or is there an easy option to sort things when
>> running on Beam?
>>
>> I'll create a Jira ticket, no problem.
>>
>> cheers
>>
>> Fabian
>>
>> Am 01.09.2022 um 19:11 schrieb Matt Casters <ma...@neo4j.com>:
>>
>> Hi Fabian,
>>
>> Joining rows is indeed the exception in Beam.  I would suggest you use
>> the Merge Join
>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html>
>> transforms.
>> For unbounded pipelines (never ending) that transform will be handled
>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>> correctly.
>> If you don't mind, please create a JIRA case so we can create a similar
>> handler for the Cartesian product use-case.
>> The code usually is non-trivial in the massive parallel world but quite
>> doable ;-)
>>
>> All the best,
>> Matt
>>
>>
>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <po...@mercadu.de> wrote:
>>
>>> Hi all,
>>>
>>> I've hit the next problem, this time something I thought I had testet on
>>> Beam before: A pipeline containing a "Join rows (cartesian product)"
>>> transform with input from two sources, loops forever when run via
>>> Beam-Direct or Dataflow. It works fine using the local runner.
>>>
>>> While running it on Beam-Direct I've attached a debugger and can see
>>> that it is stuck in the while loop at JoinRows.java:486
>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>> I've tried using a GCS temp directory and swapped the "Main transform to
>>> read from" but none of those helped.
>>>
>>> Is this transform incompatible with Beam? If so, what could I use
>>> instead?
>>>
>>> cheers
>>>
>>> Fabian
>>>
>>> <PastedGraphic-8.png>
>>>
>>
>>
>> --
>> Neo4j Chief Solutions Architect
>> *✉   *matt.casters@neo4j.com
>>
>>
>>
>>
>>
>
>
>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.casters@neo4j.com