You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ismaël Mejía (Jira)" <ji...@apache.org> on 2020/01/28 15:29:00 UTC
[jira] [Resolved] (BEAM-4409) NoSuchMethodException reading from
JmsIO
[ https://issues.apache.org/jira/browse/BEAM-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía resolved BEAM-4409.
--------------------------------
Fix Version/s: 2.20.0
Resolution: Fixed
> NoSuchMethodException reading from JmsIO
> ----------------------------------------
>
> Key: BEAM-4409
> URL: https://issues.apache.org/jira/browse/BEAM-4409
> Project: Beam
> Issue Type: Bug
> Components: io-java-jms
> Affects Versions: 2.4.0
> Environment: Linux, Java 1.8, Beam 2.4, Direct Runner, ActiveMQ
> Reporter: Edward Pricer
> Priority: Major
> Fix For: 2.20.0
>
>
> Running with the DirectRunner, and reading from a queue with JmsIO as an unbounded source will produce a NoSuchMethodException. This occurs as the UnboundedReadEvaluatorFactory.UnboundedReadEvaluator attempts to clone the JmsCheckpointMark with the default (Avro) coder.
> The following trivial code on the reader side reproduces the error (DirectRunner must be in path). The messages on the queue for this test case were simple TextMessages. I found this exception is triggered more readily when messages are published rapidly (~200/second)
> {code:java}
> Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> // read from the queue
> ConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
> PCollection<String> inputStrings = p.apply("Read from queue",
> JmsIO.<String>readMessage() .withConnectionFactory(factory)
> .withQueue("somequeue") .withCoder(StringUtf8Coder.of())
> .withMessageMapper((JmsIO.MessageMapper<String>) message ->
> ((TextMessage) message).getText()));
> // decode
> PCollection<String> asStrings = inputStrings.apply("Decode Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public
> void processElement(ProcessContext context) {
> System.out.println(context.element());
> context.output(context.element()); } }));
> p.run();
> {code}
> Stack trace:
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: javax.jms.Message.<init>() at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219) at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124) at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161) at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getDeclaredConstructor(Class.java:2178) at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> {code}
>
> And a more contrived example of how to produce the exception:
> {code:java}
> package org.apache.beam.sdk.io.jms;
> import org.apache.activemq.command.ActiveMQTextMessage;
> import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.CoderUtils;
> final class CoderErrorExample { public static void main(String[] args) throws Exception {
> Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
> JmsCheckpointMark checkpointMark = new JmsCheckpointMark();
> checkpointMark.addMessage(new ActiveMQTextMessage());
> CoderUtils.clone(coder, checkpointMark); // from org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator#getReader
> }
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)