You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 19:54:12 UTC

[GitHub] [beam] damccorm opened a new issue, #20842: getting issue to load file into database (java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)

damccorm opened a new issue, #20842:
URL: https://github.com/apache/beam/issues/20842

   Hi Team,
   
   We are getting below error :
    org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV
     
    Our target is to load file into database. We tried following approach: 
   ```
   
   @SuppressWarnings("unchecked")
   	public static void main(String[] args) {
   		PCSI02AOptions options
   = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
   		Pipeline
   p = Pipeline.create(options);
   
   
   		PCollection data1 = p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
   					.apply(ParDo.of(new
   GetRatePlanID()))
   					.apply("Format Result", 
   							MapElements.into(TypeDescriptors.strings())
   							.via((KV<String,
   Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));
   
   
   		data1.apply(JdbcIO.<KV<String, Iterable<Integer>>,
   String>readAll()
   				.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
   						.create("com.mysql.cj.jdbc.Driver",
   "jdbc:mysql://localhost:3306/ABC")
   						.withUsername("abc")
   						.withPassword("abc123"))
   				.withCoder(StringUtf8Coder.of())
   				.withParameterSetter(new
   JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
   					@Override
   					public void
   setParameters(KV<String, Iterable<Integer>> element,
   							PreparedStatement preparedStatement) throws
   Exception {
   						String[] range = element.getKey().split(",");
   						preparedStatement.setInt(1,
   Integer.parseInt(range[0]));
   					}
   
   
   				}).withQuery("select * from ABC.PAY_PLAN_INFO where plan_key
   = ?")
   				.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
   					ObjectMapper mapper = new
   ObjectMapper();
   					ArrayNode arrayNode = mapper.createArrayNode();
   					for (int i = 1; i <= resultSet.getMetaData().getColumnCount();
   i++) {
   						try {
   							ObjectNode objectNode = mapper.createObjectNode();
   							objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
   							objectNode.put("value",resultSet.getString(i));
   							arrayNode.add(objectNode);
   						}
   catch (Exception e) {
   							throw e;
   						}
   					}
   					return mapper.writeValueAsString(arrayNode);
   				})
   		)
   		;
   
   
   		State
   result = p.run().waitUntilFinish();
   		System.out.println(result);
   	}
   
   
   private static class GetPlanID
   extends DoFn<String, KV<String, Integer>> {
   		@ProcessElement
   		public void processElement(ProcessContext
   c)
   		{
   			String[] data = c.element().split(",");
   			Integer plankey = Integer.parseInt(data[0]);
   			String
   planid = data[1];
   			c.output(KV.of(planid, plankey));
   		}
   	}
   ```
   
    
   
   Error:
   ```
   
   Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException:
   java.lang.String cannot be cast to org.apache.beam.sdk.values.KVException in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
   java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV 
     
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
   
       at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
   
       at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) 
       at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
   
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) 
       at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
   
       at com.loblaw.pcinsiders.jobflow.FiletoDB.main(FiletoDB.java:120)
   Caused by: java.lang.ClassCastException:
   java.lang.String cannot be cast to org.apache.beam.sdk.values.KV 
       at com.loblaw.pcinsiders.jobflow.FiletoDB$1.setParameters(FiletoDB.java:1)
   
       at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:910)
   ```
   
    
    Kindly suggest how we can resolve it ? Or do we have any reference for same if we have kindly share link or snippets.
   
   Imported from Jira [BEAM-12005](https://issues.apache.org/jira/browse/BEAM-12005). Original Jira may contain additional context.
   Reported by: khgaura.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org