You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/31 18:57:00 UTC
[jira] [Assigned] (BEAM-12005) getting issue to load file into
database (java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.beam.sdk.values.KV)
[ https://issues.apache.org/jira/browse/BEAM-12005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles reassigned BEAM-12005:
--------------------------------------
Assignee: (was: Aizhamal Nurmamat kyzy)
> getting issue to load file into database (java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12005
> URL: https://issues.apache.org/jira/browse/BEAM-12005
> Project: Beam
> Issue Type: Bug
> Components: io-java-jdbc
> Affects Versions: 2.28.0
> Reporter: Gaurav Khandelwal
> Priority: P2
> Labels: ClassCastException, JdbcIO, MySQL, apache-beam
>
> Hi Team,
> We are getting below error :
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV
>
> Our target is to load file into database. We tried following approach:
> {code:java}
> @SuppressWarnings("unchecked")
> public static void main(String[] args) {
> PCSI02AOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
> Pipeline p = Pipeline.create(options);
> PCollection data1 = p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
> .apply(ParDo.of(new GetRatePlanID()))
> .apply("Format Result",
> MapElements.into(TypeDescriptors.strings())
> .via((KV<String, Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));
> data1.apply(JdbcIO.<KV<String, Iterable<Integer>>, String>readAll()
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
> .create("com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/ABC")
> .withUsername("abc")
> .withPassword("abc123"))
> .withCoder(StringUtf8Coder.of())
> .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
> @Override
> public void setParameters(KV<String, Iterable<Integer>> element,
> PreparedStatement preparedStatement) throws Exception {
> String[] range = element.getKey().split(",");
> preparedStatement.setInt(1, Integer.parseInt(range[0]));
> }
> }).withQuery("select * from ABC.PAY_PLAN_INFO where plan_key = ?")
> .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
> ObjectMapper mapper = new ObjectMapper();
> ArrayNode arrayNode = mapper.createArrayNode();
> for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
> try {
> ObjectNode objectNode = mapper.createObjectNode();
> objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
> objectNode.put("value",resultSet.getString(i));
> arrayNode.add(objectNode);
> } catch (Exception e) {
> throw e;
> }
> }
> return mapper.writeValueAsString(arrayNode);
> })
> )
> ;
> State result = p.run().waitUntilFinish();
> System.out.println(result);
> }
> private static class GetPlanID extends DoFn<String, KV<String, Integer>> {
> @ProcessElement
> public void processElement(ProcessContext c)
> {
> String[] data = c.element().split(",");
> Integer plankey = Integer.parseInt(data[0]);
> String planid = data[1];
> c.output(KV.of(planid, plankey));
> }
> }{code}
>
> Error:
> {code:java}
> Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KVException in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV
> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
> at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> at com.loblaw.pcinsiders.jobflow.FiletoDB.main(FiletoDB.java:120)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV
> at com.loblaw.pcinsiders.jobflow.FiletoDB$1.setParameters(FiletoDB.java:1)
> at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:910){code}
>
> Kindly suggest how we can resolve it ? Or do we have any reference for same if we have kindly share link or snippets.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)