You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/31 18:57:00 UTC

[jira] [Assigned] (BEAM-12005) getting issue to load file into database (java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)

     [ https://issues.apache.org/jira/browse/BEAM-12005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles reassigned BEAM-12005:
--------------------------------------

    Assignee:     (was: Aizhamal Nurmamat kyzy)

> getting issue to load file into database (java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12005
>                 URL: https://issues.apache.org/jira/browse/BEAM-12005
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jdbc
>    Affects Versions: 2.28.0
>            Reporter: Gaurav Khandelwal
>            Priority: P2
>              Labels: ClassCastException, JdbcIO, MySQL, apache-beam
>
> Hi Team,
> We are getting below error :
>  org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV
>   
>  Our target is to load file into database. We tried following approach: 
> {code:java}
> @SuppressWarnings("unchecked")
> 	public static void main(String[] args) {
> 		PCSI02AOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
> 		Pipeline p = Pipeline.create(options);
> 		PCollection data1 = p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
> 					.apply(ParDo.of(new GetRatePlanID()))
> 					.apply("Format Result", 
> 							MapElements.into(TypeDescriptors.strings())
> 							.via((KV<String, Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));
> 		data1.apply(JdbcIO.<KV<String, Iterable<Integer>>, String>readAll()
> 				.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
> 						.create("com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/ABC")
> 						.withUsername("abc")
> 						.withPassword("abc123"))
> 				.withCoder(StringUtf8Coder.of())
> 				.withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
> 					@Override
> 					public void setParameters(KV<String, Iterable<Integer>> element,
> 							PreparedStatement preparedStatement) throws Exception {
> 						String[] range = element.getKey().split(",");
> 						preparedStatement.setInt(1, Integer.parseInt(range[0]));
> 					}
> 				}).withQuery("select * from ABC.PAY_PLAN_INFO where plan_key = ?")
> 				.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
> 					ObjectMapper mapper = new ObjectMapper();
> 					ArrayNode arrayNode = mapper.createArrayNode();
> 					for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
> 						try {
> 							ObjectNode objectNode = mapper.createObjectNode();
> 							objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
> 							objectNode.put("value",resultSet.getString(i));
> 							arrayNode.add(objectNode);
> 						} catch (Exception e) {
> 							throw e;
> 						}
> 					}
> 					return mapper.writeValueAsString(arrayNode);
> 				})
> 		)
> 		;
> 		State result = p.run().waitUntilFinish();
> 		System.out.println(result);
> 	}
> private static class GetPlanID extends DoFn<String, KV<String, Integer>> {
> 		@ProcessElement
> 		public void processElement(ProcessContext c)
> 		{
> 			String[] data = c.element().split(",");
> 			Integer plankey = Integer.parseInt(data[0]);
> 			String planid = data[1];
> 			c.output(KV.of(planid, plankey));
> 		}
> 	}{code}
>  
> Error:
> {code:java}
> Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KVException in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV 
>     at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) 
>     at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) 
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) 
>     at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) 
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) 
>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) 
>     at com.loblaw.pcinsiders.jobflow.FiletoDB.main(FiletoDB.java:120)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV 
>     at com.loblaw.pcinsiders.jobflow.FiletoDB$1.setParameters(FiletoDB.java:1) 
>     at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:910){code}
>  
>  Kindly suggest how we can resolve it ? Or do we have any reference for same if we have kindly share link or snippets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)