You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by asha jyothi <as...@gmail.com> on 2021/02/04 18:34:07 UTC

Need help inserting data into hadoop table with flink sql in java

Hi Team,

I am new to flink and iceberg. I am trying to add rows to an existing iceberg table from a flink job with flink sql. I'm able to do that using flink sql in command line, but when I try it in java, I'm getting some errors. Its looking for connector and I couldnt figure out the iceberg connector details that I can pass in from java code from the documentation. Can you please help me with this?
Below is my code and the error that I'm getting. Thanks in advance.


 	TableEnvironment tEnv = TableEnvironment
				.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().
						withBuiltInCatalogName("flink_hadoop_catalog").withBuiltInDatabaseName("flink_iceberg_db").build());
     	try {
	    	exec(tEnv, "CREATE TABLE flink_hadoop_catalog.flink_iceberg_db.transactions%d (AccountId BIGINT ,Timestamp1 BIGINT,Amount DOUBLE)", value.getLong(0));
     	}
		catch(AlreadyExistsException e) {
	    	// do nothing
		}

    	String query = "INSERT INTO %s VALUES (%d,%d,%f)";
    	TableResult tr = exec(tEnv, query, "flink_hadoop_catalog.flink_iceberg_db.transactions"+value.getLong(0), value.getLong(0), value.getLong(1), value.getDouble(2));
     	


Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'flink_hadoop_catalog.flink_iceberg_db.transactions1'.

Table options are:
	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:349)
	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691)
	at spendreport.TransactionSinkFunction2.exec(TransactionSinkFunction2.java:123)
	at spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:102)
	at spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:55)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
	at spendreport.FraudDetector.processElement(FraudDetector.java:136)
	at spendreport.FraudDetector.processElement(FraudDetector.java:45)
	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector.
	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
	... 42 more

Thanks,
Asha Desu

Re: Need help inserting data into hadoop table with flink sql in java

Posted by asha jyothi <as...@gmail.com>.
Hi Team,

Can you please help me with the below issue?

Thanks,
Asha Desu

> On Feb 4, 2021, at 1:34 PM, asha jyothi <as...@gmail.com> wrote:
> 
> Hi Team,
> 
> I am new to flink and iceberg. I am trying to add rows to an existing iceberg table from a flink job with flink sql. I'm able to do that using flink sql in command line, but when I try it in java, I'm getting some errors. Its looking for connector and I couldnt figure out the iceberg connector details that I can pass in from java code from the documentation. Can you please help me with this?
> Below is my code and the error that I'm getting. Thanks in advance.
> 
> 
>  	TableEnvironment tEnv = TableEnvironment
> 				.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().
> 						withBuiltInCatalogName("flink_hadoop_catalog").withBuiltInDatabaseName("flink_iceberg_db").build());
>      	try {
> 	    	exec(tEnv, "CREATE TABLE flink_hadoop_catalog.flink_iceberg_db.transactions%d (AccountId BIGINT ,Timestamp1 BIGINT,Amount DOUBLE)", value.getLong(0));
>      	}
> 		catch(AlreadyExistsException e) {
> 	    	// do nothing
> 		}
> 
>     	String query = "INSERT INTO %s VALUES (%d,%d,%f)";
>     	TableResult tr = exec(tEnv, query, "flink_hadoop_catalog.flink_iceberg_db.transactions"+value.getLong(0), value.getLong(0), value.getLong(1), value.getDouble(2));
>      	
> 
> 
> Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'flink_hadoop_catalog.flink_iceberg_db.transactions1'.
> 
> Table options are:
> 	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:349)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
> 	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> 	at scala.collection.Iterator.foreach(Iterator.scala:937)
> 	at scala.collection.Iterator.foreach$(Iterator.scala:937)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> 	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> 	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> 	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691)
> 	at spendreport.TransactionSinkFunction2.exec(TransactionSinkFunction2.java:123)
> 	at spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:102)
> 	at spendreport.TransactionSinkFunction2.invoke(TransactionSinkFunction2.java:55)
> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> 	at spendreport.FraudDetector.processElement(FraudDetector.java:136)
> 	at spendreport.FraudDetector.processElement(FraudDetector.java:45)
> 	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> 	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> 	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector.
> 	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> 	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> 	... 42 more
> 
> Thanks,
> Asha Desu