You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Gaurav Khandelwal (Jira)" <ji...@apache.org> on 2021/03/17 15:06:00 UTC

[jira] [Created] (BEAM-12005) getting issue to load file into database (java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)

Gaurav Khandelwal created BEAM-12005:
----------------------------------------

             Summary: getting issue to load file into database (java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)
                 Key: BEAM-12005
                 URL: https://issues.apache.org/jira/browse/BEAM-12005
             Project: Beam
          Issue Type: Bug
          Components: beam-community, beam-model, io-java-files, io-java-gcp, io-java-jdbc, runner-dataflow
    Affects Versions: 2.28.0
            Reporter: Gaurav Khandelwal
            Assignee: Aizhamal Nurmamat kyzy


Hi Team,

We are getting below error :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.beam.sdk.values.KV
 
Our target is to load file into database. We tried following approach: 
{code:java}
public static void main(String[] args) {
	PCSI02AOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
	Pipeline p = Pipeline.create(options); PCollection data1 = p.apply(“Reading Text”, TextIO.read().from(options.getInputFile()))
			.apply(ParDo.of(new GetRatePlanID()))
			.apply(“Format Result”, 
					MapElements.into(TypeDescriptors.strings())
					.via((KV<String, Integer> ABC) -> ABC.getKey() + “,” + +ABC.getValue())); data1.apply(JdbcIO.<KV<String, Iterable<Integer>>, String>readAll()
							.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
									.create("com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/ABC").withUsername("abc")
									.withPassword(“abc123"))
											.withCoder(StringUtf8Coder.of())
											.withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
												@Override
												public void setParameters(KV<String, Iterable<Integer>> element,
														PreparedStatement preparedStatement) throws Exception {
													String[] range = element.getKey().split(“,”);
													preparedStatement.setInt(1, Integer.parseInt(range[0]));
												} }).withQuery(“select * from ABC.PAY_PLAN_INFO where plan_key = ?“)
											.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
												ObjectMapper mapper = new ObjectMapper();
												ArrayNode arrayNode = mapper.createArrayNode();
												for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
													try {
														ObjectNode objectNode = mapper.createObjectNode();
														objectNode.put(“column_name”,resultSet.getMetaData().getColumnName(i));
														objectNode.put(“value”,resultSet.getString(i));
														arrayNode.add(objectNode);
													} catch (Exception e) {
														throw e; }
												}
												return mapper.writeValueAsString(arrayNode); })
											)
									; State result = p.run().waitUntilFinish();
									System.out.println(result);
}{code}
 
 
Kindly suggest how we can resolve it ? Or do we have any reference for same if we have kindly share link or snippets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)