You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Neil Kolban (Jira)" <ji...@apache.org> on 2020/01/08 05:19:00 UTC

[jira] [Commented] (BEAM-4409) NoSuchMethodException reading from JmsIO

    [ https://issues.apache.org/jira/browse/BEAM-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010378#comment-17010378 ] 

Neil Kolban commented on BEAM-4409:
-----------------------------------

What is the status of this issue?  I see its been open for 20 months.  I too am suffering with the exact same issue.

> NoSuchMethodException reading from JmsIO
> ----------------------------------------
>
>                 Key: BEAM-4409
>                 URL: https://issues.apache.org/jira/browse/BEAM-4409
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jms
>    Affects Versions: 2.4.0
>         Environment: Linux, Java 1.8, Beam 2.4, Direct Runner, ActiveMQ
>            Reporter: Edward Pricer
>            Priority: Major
>
> Running with the DirectRunner, and reading from a queue with JmsIO as an unbounded source will produce a NoSuchMethodException. This occurs as the UnboundedReadEvaluatorFactory.UnboundedReadEvaluator attempts to clone the JmsCheckpointMark with the default (Avro) coder.
> The following trivial code on the reader side reproduces the error (DirectRunner must be in path). The messages on the queue for this test case were simple TextMessages. I found this exception is triggered more readily when messages are published rapidly (~200/second)
> {code:java}
> Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> // read from the queue
> ConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
> PCollection<String> inputStrings = p.apply("Read from queue",
> JmsIO.<String>readMessage() .withConnectionFactory(factory)
> .withQueue("somequeue") .withCoder(StringUtf8Coder.of())
> .withMessageMapper((JmsIO.MessageMapper<String>) message ->
> ((TextMessage) message).getText()));
> // decode 
> PCollection<String> asStrings = inputStrings.apply("Decode Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public
> void processElement(ProcessContext context) {
> System.out.println(context.element());
> context.output(context.element()); } })); 
> p.run();
> {code}
> Stack trace:
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: javax.jms.Message.<init>() at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219) at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124) at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161) at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getDeclaredConstructor(Class.java:2178) at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> {code}
>  
> And a more contrived example of how to produce the exception:
> {code:java}
> package org.apache.beam.sdk.io.jms; 
> import org.apache.activemq.command.ActiveMQTextMessage; 
> import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.CoderUtils; 
> final class CoderErrorExample { public static void main(String[] args) throws Exception { 
>   Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
>   JmsCheckpointMark checkpointMark = new JmsCheckpointMark(); 
>   checkpointMark.addMessage(new ActiveMQTextMessage());
>   CoderUtils.clone(coder, checkpointMark); // from org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator#getReader
> } 
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)