You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Edward Pricer (JIRA)" <ji...@apache.org> on 2018/05/25 03:01:00 UTC

[jira] [Created] (BEAM-4409) NoSuchMethodException reading from JmsIO

Edward Pricer created BEAM-4409:
-----------------------------------

             Summary: NoSuchMethodException reading from JmsIO
                 Key: BEAM-4409
                 URL: https://issues.apache.org/jira/browse/BEAM-4409
             Project: Beam
          Issue Type: Bug
          Components: io-java-jms
    Affects Versions: 2.4.0
         Environment: Linux, Java 1.8, Beam 2.4, Direct Runner, ActiveMQ
            Reporter: Edward Pricer
            Assignee: Jean-Baptiste Onofré


Running with the DirectRunner, and reading from a queue with JmsIO as an unbounded source will produce a NoSuchMethodException. This occurs as the UnboundedReadEvaluatorFactory.UnboundedReadEvaluator attempts to clone the JmsCheckpointMark with the default (Avro) coder.

The following trivial code on the reader side reproduces the error (DirectRunner must be in path). The messages on the queue for this test case were simple TextMessages. I found this exception is triggered more readily when messages are published rapidly (~200/second)
{code:java}
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

// read from the queue
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");

PCollection<String> inputStrings = p.apply("Read from queue",
JmsIO.<String>readMessage() .withConnectionFactory(factory)
.withQueue("somequeue") .withCoder(StringUtf8Coder.of())
.withMessageMapper((JmsIO.MessageMapper<String>) message ->
((TextMessage) message).getText()));

// decode PCollection<String> asStrings = inputStrings.apply("Decode
Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public
void processElement(ProcessContext context) {
System.out.println(context.element());
context.output(context.element()); } })); 
p.run();
{code}
Stack trace:
{code:java}
Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: javax.jms.Message.<init>() at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219) at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124) at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161) at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getDeclaredConstructor(Class.java:2178) at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
{code}
 

And a more contrived example of how to produce the exception:
{code:java}
package org.apache.beam.sdk.io.jms; 
import org.apache.activemq.command.ActiveMQTextMessage; 
import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.CoderUtils; 
final class CoderErrorExample { public static void main(String[] args) throws Exception { 
  Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
  JmsCheckpointMark checkpointMark = new JmsCheckpointMark(); 
  checkpointMark.addMessage(new ActiveMQTextMessage());
  CoderUtils.clone(coder, checkpointMark); // from org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator#getReader
} 
}
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)