You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/02 14:17:11 UTC

[GitHub] [hudi] soumilshah1995 opened a new issue, #7591: [SUPPORT] Kinesis Data Analytics Flink to HUDI

soumilshah1995 opened a new issue, #7591:
URL: https://github.com/apache/hudi/issues/7591

   Good Morning 
   i am well aware that we can spin up FLink in EMR and get this to work. lot of people would love to use natively managed Apache Flink which is available in Kinesis Data Analytics console. my question would be is there a request  made by other to have HUDI in Managed Kinesis Data Analytics (FLink) 
   
   ### Steps
   ```
   %flink.ssql
   
   DROP TABLE if exists tbl_orders;
   
   CREATE TABLE tbl_orders (
       orderid VARCHAR,
       customer_id VARCHAR,
       ts TIMESTAMP(3),
       order_value DOUBLE,
       priority VARCHAR,
       WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
   
   )
   WITH (
       'connector' = 'kinesis',
       'stream' = 'order_streams',
       'aws.region' = 'us-east-1',
       'scan.stream.initpos' = 'LATEST',
       'format' = 'json',
       'json.timestamp-format.standard' = 'ISO-8601'
       );
   
   
   DROP TABLE if exists tbl_orders_hudi;
   
   CREATE TABLE tbl_orders_hudi (
       orderid VARCHAR,
       customer_id VARCHAR,
       order_value DOUBLE
   
   )
   WITH (
       'connector' = 'hudi',
       'path' = 's3://glue-learn-begineers/tmp/',
       'table.type' = 'MERGE_ON_READ' 
       );
   
   ```
   #### Able to select data from my kinesis streams 
   ![image](https://user-images.githubusercontent.com/39345855/210243080-98500324-0c81-413a-8072-45a8d0668b0b.png)
   
   ####  Fails 
   ```
   %flink.ssql(type=update)
   
   INSERT INTO tbl_orders_hudi
   SELECT 
       orderid ,
       customer_id ,
       order_value 
   FROM tbl_orders;
   
   ```
   
   #### Error Messages 
   
   ```
   Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3://glue-learn-begineers/tmp/'
   'table.type'='MERGE_ON_READ'
   java.io.IOException: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3://glue-learn-begineers/tmp/'
   'table.type'='MERGE_ON_READ'
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
   	at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
   	at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
   	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
   	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
   	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
   	at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3://glue-learn-begineers/tmp/'
   'table.type'='MERGE_ON_READ'
   	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
   	at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373)
   	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
   	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
   	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
   	at scala.collection.Iterator.foreach(Iterator.scala:937)
   	at scala.collection.Iterator.foreach$(Iterator.scala:937)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
   	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
   	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
   	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
   	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
   	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
   	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
   	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1510)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1460)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
   	... 14 more
   Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
   
   Table options are:
   
   'connector'='hudi'
   'path'='s3://glue-learn-begineers/tmp/'
   'table.type'='MERGE_ON_READ'
   	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
   	at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSink(HiveDynamicTableFactory.java:81)
   	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
   	... 32 more
   Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='hudi'
   	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
   	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
   	at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:167)
   	... 34 more
   Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hudi' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
   
   Available factory identifiers are:
   
   blackhole
   datagen
   filesystem
   kafka
   kinesis
   print
   upsert-kafka
   	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
   	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
   	... 36 more
   ```
   
   i am assuming that in KDA hudi is not supported natively am i correct ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375220406

   @soumilshah1995 I used 1.13.2 version of the packege _flink-s3-fs-hadoop-1.13.2.jar_ not _flink-s3-fs-hadoop-1.13.0.jar_ and it could work some time, because the resources within KDA Studio seems not very stable, while in my opinion it's still worth a try.
   
   By the way, it's really hard to troubleshoot as CloudWatch log stream is always spammed with errors like below, no more useful information showed what was going on under the hood.
   ```
   {
       "applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxx:application/kda-studio-david",
       "applicationVersionId": "19",
       "locationInformation": "org.apache.zeppelin.flink.JobManager$FlinkJobProgressPoller.run(JobManager.java:269)",
       "logger": "org.apache.zeppelin.flink.JobManager",
       "message": "Fail to poll flink job progress via rest api",
       "messageSchemaVersion": "1",
       "messageType": "ERROR",
       "threadName": "JobProgressPoller-Thread-paragraph_1673235629990_1003353402",
       "throwableInformation": "com.mashape.unirest.http.exceptions.UnirestException: java.lang.RuntimeException: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.HttpClientHelper.request(HttpClientHelper.java:143)\n\tat com.mashape.unirest.request.BaseRequest.asJson(BaseRequest.java:68)\n\tat org.apache.zeppelin.flink.JobManager$FlinkJobProgressPoller.run(JobManager.java:212)\nCaused by: java.lang.RuntimeException: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.HttpResponse.<init>(HttpResponse.java:106)\n\tat com.mashape.unirest.http.HttpClientHelper.request(HttpClientHelper.java:139)\n\t... 2 more\nCaused by: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.JsonNode.<init>(JsonNode.java:51)\n\t
 at com.mashape.unirest.http.HttpResponse.<init>(HttpResponse.java:95)\n\t... 3 more\nCaused by: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat org.json.JSONTokener.syntaxError(JSONTokener.java:433)\n\tat org.json.JSONArray.<init>(JSONArray.java:106)\n\tat org.json.JSONArray.<init>(JSONArray.java:145)\n\tat com.mashape.unirest.http.JsonNode.<init>(JsonNode.java:48)\n\t... 4 more\n"
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382785804

   Since issue has been resolved i will close this 
   i shall re open if i find bugs thank you all 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1374565548

   @davidshtian


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Adi0000 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by "Adi0000 (via GitHub)" <gi...@apache.org>.
Adi0000 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1585790229

   Hi @soumilshah1995 , i am try to insert into hudi table from source table but getting this below error. i tried multiple time re-start and also tried increase kpu to 12 but same error. can you please help here. i am doing your project RDS -> DMS -> KINESIS -> kinesis analytics -> HUDI
   
   ```
   ```
   ```
   java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO cust_info_hudi (
       SELECT
           data.id,
           data.first_name,
           data.middle_name,
           data.last_name,
           data.about_me,
           data.phone_number,
           data.age,
           data.address,
           data.street,
           data.dist,
           data.city,
           data.state,
           data.country
       FROM
           source_cust_info
       )'.
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
   	at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
   	at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
   	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
   	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
   	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
   	at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO cust_info_hudi (
       SELECT
           data.id,
           data.first_name,
           data.middle_name,
           data.last_name,
           data.about_me,
           data.phone_number,
           data.age,
           data.address,
           data.street,
           data.dist,
           data.city,
           data.state,
           data.country
       FROM
           source_cust_info
       )'.
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
   	at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
   	at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
   	... 14 more
   Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
   	at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
   	at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
   	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
   	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
   	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
   	... 1 more
   ```
   ```
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382480482

   @davidshtian  would be hard to tell exact root cause 
   i think issue was with jar files Here is video for community 
   https://www.youtube.com/watch?v=8XS8egfrS_o&t=148s
   
   however i am trying some other stuff out and im running into edge cases would you recommend opening new support or we should continue here ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1381511651

   @soumilshah1995 Great~ Kindly looking forward to the tutorial! Curious about what is the root cause for your case. Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1370164850

   Here is details again i have tried again this morning 
   
   #### Please note This time i am on US-WEST-2 previously i was trying on US-EAST-1
   
   #### Kinesis Streams 
   ![image](https://user-images.githubusercontent.com/39345855/210422439-4c008e1e-ff85-421b-8fc5-75b8b635896d.png)
   
   ### Python Code to Dump Dummy Data 
   ```
   try:
       import datetime
       import json
       import random
       import boto3
       import os
       import uuid
       import time
       from faker import Faker
       
       from dotenv import load_dotenv
       load_dotenv(".env")
   except Exception as e:
       pass
   
   global faker
   faker = Faker()
   
   
   def getReferrer():
       data = {}
       now = datetime.now()
       str_now = now.isoformat()
       data['uuid'] = str(uuid.uuid4())
       data['event_time'] = str_now
   
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data = json.dumps(getReferrer())
       print(data)
       global kinesis_client
   
       kinesis_client = boto3.client('kinesis',
                                     region_name=os.getenv("DEV_AWS_REGION_NAME"),
                                     aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                     aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
                                     )
   
       res = kinesis_client.put_record(
           StreamName="stock-streams",
           Data=data,
           PartitionKey="1")
       time.sleep(3)
   
   
   ```
   #### KDA
   
   ![image](https://user-images.githubusercontent.com/39345855/210422526-8e61f16e-c457-4aea-8a2f-c59ec03172f7.png)
   
   #### Settings Added JAR Files 
   ![image](https://user-images.githubusercontent.com/39345855/210422789-a6f08f42-2ad9-46c6-a8d9-b05ba150a6ba.png)
   
   ```
   %flink.ssql(type=update)
   
   DROP TABLE if exists stock_table;
   
   CREATE TABLE stock_table (
       uuid varchar,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3),
       WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
   )
   PARTITIONED BY (ticker)
   WITH (
       'connector' = 'kinesis',
       'stream' = 'stock-streams',
       'aws.region' = 'us-west-2',
       'scan.stream.initpos' = 'LATEST',
       'format' = 'json',
       'json.timestamp-format.standard' = 'ISO-8601'
   );
   ```
   ![image](https://user-images.githubusercontent.com/39345855/210428825-c54b6e4b-c262-409d-ae25-ccb134c0b011.png)
   
   ```
   %flink.ssql(type=update)
   
   DROP TABLE if exists stock_table_hudi;
   
   CREATE TABLE stock_table_hudi(
       uuid varchar,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3)
   )
   WITH (
     'connector' = 'hudi',
     'path' = 's3://soumil-dms-learn',
     'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
   );
   ```
   ![image](https://user-images.githubusercontent.com/39345855/210429264-f131e2ab-14ee-47a9-beff-a98c8c598b3f.png)
   
   # Real Time Data 
   ![image](https://user-images.githubusercontent.com/39345855/210429752-6baaff85-eaf6-44b2-bb9e-cdf68b34ac06.png)
   
   
   ## Error Messages Same as above
   
   ```
   ConnectException: Connection refused
   java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi 
   SELECT  uuid, ticker, price, event_time as ts from stock_table'.
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
   	at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
   	at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
   	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
   	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
   	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
   	at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi 
   SELECT  uuid, ticker, price, event_time as ts from stock_table'.
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
   	at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
   	at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
   	... 14 more
   Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
   	at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
   	at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
   	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
   	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
   	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
   	... 1 more
   Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
   	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
   	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
   	at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$0(AbstractSessionClusterExecutor.java:83)
   	at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
   	... 5 more
   Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
   	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
   	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
   	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
   	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	... 1 more
   Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.217.181:8082
   	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
   	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
   	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
   	... 19 more
   Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.217.181:8082
   Caused by: java.net.ConnectException: Connection refused
   	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
   	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
   	at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   ![image](https://user-images.githubusercontent.com/39345855/210430926-2589ee44-8403-4383-a1e6-81a21b14a063.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1368991339

   Someone has posted a solution https://github.com/apache/hudi/issues/3707
   i will try these custom JAR files and let you know if that works i will close this tickets 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1369400346

   I see you use the streaming execution mode for `VALUES` SQL statement, did you try the batch execution mode instead then ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382693602

   > @davidshtian what are recommend setting for querying data in Athena AKA Hive SYNC?
   > 
   > ```
   > %flink.ssql(type=update)
   > DROP TABLE if exists orders;
   > CREATE TABLE orders(
   >     orderid VARCHAR PRIMARY KEY NOT ENFORCED,
   >     customer_id VARCHAR,
   >     ts TIMESTAMP(3),
   >     order_value DOUBLE,
   >     priority VARCHAR
   > )
   > WITH (
   >     'connector' = 'hudi',
   >     'path' = 's3a://soumilshah-hudi-demos/tmp/',
   >     'table.type' = 'COPY_ON_WRITE' ,
   >     'hoodie.embed.timeline.server' = 'false',
   >     'hive_sync.enable' = 'true',  
   >     'hive_sync.mode' = 'hms'     
   > 
   > );
   > ```
   > 
   > @davidshtian also curious why did you use %ssql instead of %flink.ssql(type=update)
   > 
   > Also follow up Question at times KDA throws Error is that normal looks like its very unstable what are your honest feedback on that
   
   @soumilshah1995  %ssql is also configured by default.
   
   <img width="464" alt="image" src="https://user-images.githubusercontent.com/14228056/212458273-c02f5fc9-b508-4c88-8049-319ff998a0bb.png">
   
   For Glue catalog sync, yes it should be using hive_sync but it is required to upload a new version of hudi-flink jar file (and others related), as [hudi docs](https://hudi.apache.org/docs/0.10.1/syncing_metastore) shows:
   
   > hudi-flink-bundle module pom.xml sets the scope related to hive as provided by default. If you want to use hive sync, you need to use the profile flink-bundle-shade-hive during packaging. 
   
   ```
   'hive_sync.enable'='true',
   'hive_sync.db'='<your db>',
   'hive_sync.table' = '<your table>',
   'hive_sync.mode' = 'hms',
   ```
   
   I've tried it but failed to sync, not sure if it is due to this part:
   <img width="832" alt="image" src="https://user-images.githubusercontent.com/14228056/212463692-f06859a6-3b3c-4794-b356-06209338808e.png">
   
   And I used manual way to create the table for Athena. 
   
   Cool content on Youtube~ 👍


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 closed issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 closed issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13  to HUDI 
URL: https://github.com/apache/hudi/issues/7591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1370928644

   Hey guys Good morning Any updates ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1371464600

   Good Evening do we have any updates 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1586164159

   > i am doing your project RDS -> DMS -> KINESI
   
   Hey looks like jar or configuration issuer 
   can you follow steps here 
   https://github.com/soumilshah1995/aws-dms-kinesis-flink-hudi
   
   https://github.com/soumilshah1995/dynamodb-flink-kinesis


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1374848788

   # Detailed Steps i have followed that allows anyone to replicate 
   @nsivabalan  @davidshtian @danny0405  
   
   
   # Step 1:  Download and Upload the JAR into S3 
   •	https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.13.0
   •	https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.10.1/hudi-flink-bundle_2.12-0.10.1.jar
   
   # Step 2:  Create KDA Application 
   
   ![image](https://user-images.githubusercontent.com/39345855/211200378-71b50563-4ca8-4c57-afbc-e30ee86ad9d1.png)
   
   ### Provide Name to KDA APP
   ![image](https://user-images.githubusercontent.com/39345855/211200437-1af10bac-b2dd-40e6-9686-7c41c2a092c7.png)
   
   ### Select the Glue database 
   ![image](https://user-images.githubusercontent.com/39345855/211200482-49b07851-35fd-4260-9426-691b324becad.png)
   
   #### NOTE i have given Admin Access there should not be problem with access 
   
   ### Add Custom JAR we downloaded 
   ![image](https://user-images.githubusercontent.com/39345855/211200540-58facbee-50ec-4ca3-abb8-3f6c5eec7fbd.png)
   
   ![image](https://user-images.githubusercontent.com/39345855/211200599-7a362e7c-b4f8-422d-a18d-62566fd0ed58.png)
   ![image](https://user-images.githubusercontent.com/39345855/211200607-f2ec629b-3adf-437b-bc6e-9807c696711f.png)
   
   # Step 3:  Launch Zeplin Notebooks
   ![image](https://user-images.githubusercontent.com/39345855/211200880-6eb1d210-adcb-4a58-98a2-d3e4fabcec4e.png)
   
   # Step 4:  Create a Kinesis Stream called stock-streams
   
   ![image](https://user-images.githubusercontent.com/39345855/211201204-f38b196e-6080-4831-aebb-72feb529ac38.png)
   
   ##### Add some data into streams 
   
   ```
   try:
       import datetime
       import json
       import random
       import boto3
       import os
       import uuid
       import time
       from faker import Faker
   
       from dotenv import load_dotenv
       load_dotenv(".env")
   except Exception as e:
       pass
   
   global faker
   faker = Faker()
   
   
   def getReferrer():
       data = {}
       now = datetime.datetime.now()
       str_now = now.isoformat()
       data['uuid'] = str(uuid.uuid4())
       data['event_time'] = str_now
   
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data = json.dumps(getReferrer())
   
       global kinesis_client
   
       kinesis_client = boto3.client('kinesis',
                                     region_name='us-west-2',
                                     aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                     aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
                                     )
   
       res = kinesis_client.put_record(
           StreamName="stock-streams",
           Data=data,
           PartitionKey="1")
       print(data, " " , res)
       time.sleep(2)
   ```
   ![image](https://user-images.githubusercontent.com/39345855/211201231-b92861c9-4fc0-480a-9312-f966d4c0d81f.png)
   
   
   # Step 5: Lets go try hello world on HUDI 
   ```
   %flink.ssql(type=update)
   DROP TABLE if exists stock_table;
   CREATE TABLE stock_table (
       uuid varchar,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3),
       WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
   )
       PARTITIONED BY (ticker)
   WITH (
       'connector' = 'kinesis',
       'stream' = 'stock-streams',
       'aws.region' = 'us-west-2',
       'scan.stream.initpos' = 'TRIM_HORIZON',
       'format' = 'json',
       'json.timestamp-format.standard' = 'ISO-8601'
   );
   ```
   ![image](https://user-images.githubusercontent.com/39345855/211201321-3c2366dc-ddf0-4487-a925-38922a942f43.png)
   ![image](https://user-images.githubusercontent.com/39345855/211201361-446728c9-833e-4708-9b9d-c39ea650392d.png)
   ```
   %flink.ssql(type=update)
   DROP TABLE if exists stock_table_hudi;
   CREATE TABLE stock_table_hudi(
       uuid varchar  ,
       ticker VARCHAR,
       price DOUBLE,
       event_time TIMESTAMP(3),
       PRIMARY KEY (`uuid`) NOT Enforced
   
   )
   WITH (
       'connector' = 'hudi',
       'path' = 's3://soumilshah-hudi-demos/tmp/',
       'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
   );
   ```
   #### INSERT Into HUDI 
   ```
   %flink.ssql(type=update)
   INSERT INTO stock_table_hudi
   SELECT  uuid, ticker, price, event_time  from stock_table;
   ``
   
   
   # Error Messages 
   ```
   java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
   SELECT  uuid, ticker, price, event_time  from stock_table'.
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
   	at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
   	at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
   	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
   	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
   	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
   	at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
   SELECT  uuid, ticker, price, event_time  from stock_table'.
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
   	at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
   	at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
   	... 14 more
   Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
   	at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
   	at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
   	at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
   	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
   	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
   	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
   	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	... 1 more
   Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
   	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
   	... 21 more
   Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/10.100.42.225:8082
   	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
   	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
   	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
   	... 19 more
   Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/10.100.42.225:8082
   Caused by: java.net.ConnectException: Connection refused
   	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
   	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
   	at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   
   please let me know if you guys need any other details i have given all steps which help to rectify the problem and solve it happy to hop on call or meeting to show you steps too Email shahsoumil519@gmail.com
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 closed issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 closed issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13  to HUDI 
URL: https://github.com/apache/hudi/issues/7591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1376687802

   @soumilshah1995 I tried again, it worked as below, for your reference. Thanks~
   
   **Step 1 – Kinesis Stream**
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456528-eb52b665-05aa-4cca-a44f-30b0d1948ab8.png">
   
   **Step 2 – Jars**
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456562-92dcd98e-9964-4db9-be71-5db6e9b3ef40.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456572-986a2a4a-8b2e-47bd-a205-72e265d359ae.png">
   
   **Step 3 – KDA Studio**
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456602-8813ca24-4b81-4b41-bcf6-c4cf31f64556.png">
   
   **Step 4 – Executing the code**
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456628-08f24462-54f1-453a-b46c-80c767752c25.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456638-d1f6d477-4ec2-4d6d-bcfd-4d53581e1ad4.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456650-7e0f8a22-83d0-456f-a309-2d46cadecd89.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456661-f2f95408-6bf8-4361-9e3d-d50974728b78.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456668-9a1ec0c1-bce5-4cbe-8607-2a98d4527741.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456684-dfb720af-1cd5-4da9-ac2a-12c28283d4ee.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456694-6b23055c-bba0-4077-b2de-bb21013448ce.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456708-bb8bc494-a0c8-438d-9d92-8e537023c6d3.png">
   <img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456716-9c4825e7-05a7-4d35-a4ef-8f966d39056e.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1376707180

   From the logs _java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082_, it seems that either the Flink cluster was not started at all, or it might be a network issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1369696915

   Hey Danny
   Yes I did try that yesterday I could not get it to work.
   I keep getting this same error message
   
   
   
   
   On Tue, Jan 3, 2023 at 12:08 AM Danny Chan ***@***.***> wrote:
   
   > I see you use the streaming execution mode for VALUES SQL statement, did
   > you try the batch execution mode instead then ?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/hudi/issues/7591#issuecomment-1369400346>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AJMF5P6ZHZ5RE7JPFL2FDTLWQOX3BANCNFSM6AAAAAATO3NBLA>
   > .
   > You are receiving this because you authored the thread.Message ID:
   > ***@***.***>
   >
   -- 
   Thanking You,
   Soumil Nitin Shah
   
   B.E in Electronic
   MS Electrical Engineering
   MS  Computer Engineering
   +1-646 204 5957
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1373625630

   Hi danny i do not have logs i am using AWS kinesis managed service for Flink 
   i have provided all necessary steps if needed team can replicate the error.
   please let me know if you need any further information 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375058501

   Hello, @umehrot2   sorry to bother, is there any chance you can give some validations here ? It seems this customer is from AWS though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382490644

   @davidshtian  what are recommend setting for querying data in Athena AKA Hive SYNC?
   
   ```
   %flink.ssql(type=update)
   DROP TABLE if exists orders;
   CREATE TABLE orders(
       orderid VARCHAR PRIMARY KEY NOT ENFORCED,
       customer_id VARCHAR,
       ts TIMESTAMP(3),
       order_value DOUBLE,
       priority VARCHAR
   )
   WITH (
       'connector' = 'hudi',
       'path' = 's3a://soumilshah-hudi-demos/tmp/',
       'table.type' = 'COPY_ON_WRITE' ,
       'hoodie.embed.timeline.server' = 'false',
       'hive_sync.enable' = 'true',  
       'hive_sync.mode' = 'hms'     
   
   );
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1380372534

   im trying this today itself lets hope for best 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382771434

   You are right man 
   i used it manually to using DDL statement to create table in Athena


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375225426

   Or lots of similar logs like this "RemoteTransportException: Connection unexpectedly closed by remote task manager '142.151.160.20/142.151.160.20:6121'. This might indicate that the remote task manager was lost.
   This may indicate that the taskmanagers are overloaded."
   ```
   "applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxx:application/kda-studio-david",
   "applicationVersionId": "19",
   "locationInformation": "org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:163)",
   "logger": "org.apache.zeppelin.flink.FlinkSqlInterrpeter",
   "message": "Fail to run sql:INSERT INTO t1 SELECT uuid, event_time, ticker,price from stock_table",
   "messageSchemaVersion": "1",
   "messageType": "ERROR",
   "threadName": "ParallelScheduler-Worker-9",
   "throwableInformation": "java.io.IOException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 3e6e3e07934084f5fafc542a4a91bc2c)
   at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538
   at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97
   at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273
   at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160
   at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112
   at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47
   at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110
   at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852
   at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744
   at org.apache.zeppelin.scheduler.Job.run(Job.java:172
   at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132
   at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46
   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128
   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628
   at java.base/java.lang.Thread.run(Thread.java:829
   Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 3e6e3e07934084f5fafc542a4a91bc2c
   at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:125
   at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642
   at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
   at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
   at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394
   at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
   at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
   at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
   at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
   at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$24(RestClusterClient.java:670
   at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
   at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
   at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
   at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
   at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394
   at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
   at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
   at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
   at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610
   at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085
   at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478
   ... 3 mor
   Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed
   at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144
   at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:123
   ... 23 mor
   Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrateg
   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138
   at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82
   at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:222
   at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212
   at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203
   at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:718
   at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79
   at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443
   at jdk.internal.reflect.GeneratedMethodAccessor100.invoke(Unknown Source
   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43
   at java.base/java.lang.reflect.Method.invoke(Method.java:566
   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305
   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212
   at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77
   at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21
   at scala.PartialFunction.applyOrElse(PartialFunction.scala:123
   at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122
   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172
   at akka.actor.Actor.aroundReceive(Actor.scala:517
   at akka.actor.Actor.aroundReceive$(Actor.scala:515
   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592
   at akka.actor.ActorCell.invoke(ActorCell.scala:561
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258
   at akka.dispatch.Mailbox.run(Mailbox.scala:225
   at akka.dispatch.Mailbox.exec(Mailbox.scala:235
   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260
   at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339
   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979
   at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107
   Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '142.151.160.20/142.151.160.20:6121'. This might indicate that the remote task manager was lost
   at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
   at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81
   at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
   at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389
   at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354
   at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1106
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
   at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
   at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901
   at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818
   at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164
   at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472
   at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384
   at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989
   at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74
   at java.base/java.lang.Thread.run(Thread.java:829)
   "
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1381518382

   @soumilshah1995 
   
   >i am going to work release a nice tutorial for community on Flink and hudi and hope 
   
   What a great news, welcome on board the Hudi community, there are so many nice guys that are smart and warm, I think you can make the community better and better.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1381626839

   @soumilshah1995 Is it related to s3 and s3a schema?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1380564963

   Dear  @davidshtian  
   
   I wanted to take a moment to thank you for the help you provided me. Your assistance was extremely valuable and I appreciate your willingness to lend a hand. Your support is greatly appreciated and I'm thankful for your help. 
   
   i was able to resolve this issue. i am going to work release a nice tutorial for community on Flink and hudi and hope that helps all 
   
   Thanks again for your help. 
   
   Sincerely, 
   shah


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1373214655

   I didn't see any error stack trace about hudi, it is hard to locate the cause of the problem, there is a `Connection refused` error thrown from Netty, maybe the TaskManager has been crashed, can you see the errors from the TM log file then ? Or JM log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1374699427

   > @davidshtian
   
   @soumilshah1995 Have you tried 1.13.2 version of the packege _flink-s3-fs-hadoop-1.13.2.jar_? As KDA [supports for Apache Flink version 1.13.2](https://docs.aws.amazon.com/kinesisanalytics/latest/java/doc-history.html), thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375695222

   ### Thank you  @davidshtian here are my results based on feedback 
   
   * #I am now trying this with following changes 
   
   * flink-s3-fs-hadoop-1.13.2
   * hudi-flink-bundle_2.12-0.10.1.jar
   
   # Attempt 3
   # Step 1: Made the Stream Again 
   ![image](https://user-images.githubusercontent.com/39345855/211321796-b46d6b5f-d981-4e3e-b0f5-e27c07f3ae63.png)
   
   
   # Step 2 Upload the Jar on S3 as mentioned by David
   ![image](https://user-images.githubusercontent.com/39345855/211322361-dcff2d61-0665-4021-ae70-86a2099e2ac6.png)
   
   
   # Step 3 Creating KDA 
   ![image](https://user-images.githubusercontent.com/39345855/211322739-d3b682d4-1348-4478-9589-f42c846e437b.png)
   
   # Step 4 Executing Code
   ![image](https://user-images.githubusercontent.com/39345855/211327790-41c2bfcf-7de9-44bc-9e7c-35f95c9ebcde.png)
   
   # Same Error 
   ```
   java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
   SELECT  uuid, ticker, price, event_time  from stock_table where uuid is NOT NULL'.
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
   	at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
   	at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
   	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
   	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
   	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
   	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
   	at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
   SELECT  uuid, ticker, price, event_time  from stock_table where uuid is NOT NULL'.
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
   	at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
   	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
   	at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
   	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
   	... 14 more
   Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
   	at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
   	at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
   	at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
   	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
   	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
   	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
   	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	... 1 more
   Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
   	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
   	... 21 more
   Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082
   	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
   	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
   	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
   	... 19 more
   Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082
   Caused by: java.net.ConnectException: Connection refused
   	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
   	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
   	at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
   	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
   	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI

Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375398513

   @soumilshah1995 For your reference, in order to write data to S3 hudi sink, _checkpoint_ need to be enabled (restart the interpreter and execute).
   ```
   %flink.conf
   execution.checkpointing.interval 1000
   ```
   And I also add below option to hudi table for disabling timeline server, otherwise errors like _"Caused by: org.apache.hudi.exception.HoodieRemoteException: Connect to 142.151.165.206:37851 [/142.151.165.206] failed: Connection timed out (Connection timed out)"_ will be thrown (based on actual tests).
   ```
   'hoodie.embed.timeline.server' = 'false'
   ```
   Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org