You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/02 14:17:11 UTC
[GitHub] [hudi] soumilshah1995 opened a new issue, #7591: [SUPPORT] Kinesis Data Analytics Flink to HUDI
soumilshah1995 opened a new issue, #7591:
URL: https://github.com/apache/hudi/issues/7591
Good Morning
i am well aware that we can spin up FLink in EMR and get this to work. lot of people would love to use natively managed Apache Flink which is available in Kinesis Data Analytics console. my question would be is there a request made by other to have HUDI in Managed Kinesis Data Analytics (FLink)
### Steps
```
%flink.ssql
DROP TABLE if exists tbl_orders;
CREATE TABLE tbl_orders (
orderid VARCHAR,
customer_id VARCHAR,
ts TIMESTAMP(3),
order_value DOUBLE,
priority VARCHAR,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'order_streams',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
DROP TABLE if exists tbl_orders_hudi;
CREATE TABLE tbl_orders_hudi (
orderid VARCHAR,
customer_id VARCHAR,
order_value DOUBLE
)
WITH (
'connector' = 'hudi',
'path' = 's3://glue-learn-begineers/tmp/',
'table.type' = 'MERGE_ON_READ'
);
```
#### Able to select data from my kinesis streams
![image](https://user-images.githubusercontent.com/39345855/210243080-98500324-0c81-413a-8072-45a8d0668b0b.png)
#### Fails
```
%flink.ssql(type=update)
INSERT INTO tbl_orders_hudi
SELECT
orderid ,
customer_id ,
order_value
FROM tbl_orders;
```
#### Error Messages
```
Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
Table options are:
'connector'='hudi'
'path'='s3://glue-learn-begineers/tmp/'
'table.type'='MERGE_ON_READ'
java.io.IOException: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
Table options are:
'connector'='hudi'
'path'='s3://glue-learn-begineers/tmp/'
'table.type'='MERGE_ON_READ'
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
Table options are:
'connector'='hudi'
'path'='s3://glue-learn-begineers/tmp/'
'table.type'='MERGE_ON_READ'
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:373)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1510)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1460)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
... 14 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'hive.streamdb.tbl_orders_hudi'.
Table options are:
'connector'='hudi'
'path'='s3://glue-learn-begineers/tmp/'
'table.type'='MERGE_ON_READ'
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
at org.apache.flink.connectors.hive.HiveDynamicTableFactory.createDynamicTableSink(HiveDynamicTableFactory.java:81)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
... 32 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='hudi'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:167)
... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hudi' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
kafka
kinesis
print
upsert-kafka
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
... 36 more
```
i am assuming that in KDA hudi is not supported natively am i correct ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375220406
@soumilshah1995 I used 1.13.2 version of the packege _flink-s3-fs-hadoop-1.13.2.jar_ not _flink-s3-fs-hadoop-1.13.0.jar_ and it could work some time, because the resources within KDA Studio seems not very stable, while in my opinion it's still worth a try.
By the way, it's really hard to troubleshoot as CloudWatch log stream is always spammed with errors like below, no more useful information showed what was going on under the hood.
```
{
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxx:application/kda-studio-david",
"applicationVersionId": "19",
"locationInformation": "org.apache.zeppelin.flink.JobManager$FlinkJobProgressPoller.run(JobManager.java:269)",
"logger": "org.apache.zeppelin.flink.JobManager",
"message": "Fail to poll flink job progress via rest api",
"messageSchemaVersion": "1",
"messageType": "ERROR",
"threadName": "JobProgressPoller-Thread-paragraph_1673235629990_1003353402",
"throwableInformation": "com.mashape.unirest.http.exceptions.UnirestException: java.lang.RuntimeException: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.HttpClientHelper.request(HttpClientHelper.java:143)\n\tat com.mashape.unirest.request.BaseRequest.asJson(BaseRequest.java:68)\n\tat org.apache.zeppelin.flink.JobManager$FlinkJobProgressPoller.run(JobManager.java:212)\nCaused by: java.lang.RuntimeException: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.HttpResponse.<init>(HttpResponse.java:106)\n\tat com.mashape.unirest.http.HttpClientHelper.request(HttpClientHelper.java:139)\n\t... 2 more\nCaused by: java.lang.RuntimeException: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat com.mashape.unirest.http.JsonNode.<init>(JsonNode.java:51)\n\t
at com.mashape.unirest.http.HttpResponse.<init>(HttpResponse.java:95)\n\t... 3 more\nCaused by: org.json.JSONException: A JSONArray text must start with '[' at 1 [character 2 line 1]\n\tat org.json.JSONTokener.syntaxError(JSONTokener.java:433)\n\tat org.json.JSONArray.<init>(JSONArray.java:106)\n\tat org.json.JSONArray.<init>(JSONArray.java:145)\n\tat com.mashape.unirest.http.JsonNode.<init>(JsonNode.java:48)\n\t... 4 more\n"
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382785804
Since issue has been resolved i will close this
i shall re open if i find bugs thank you all
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1374565548
@davidshtian
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] Adi0000 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by "Adi0000 (via GitHub)" <gi...@apache.org>.
Adi0000 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1585790229
Hi @soumilshah1995 , i am try to insert into hudi table from source table but getting this below error. i tried multiple time re-start and also tried increase kpu to 12 but same error. can you please help here. i am doing your project RDS -> DMS -> KINESIS -> kinesis analytics -> HUDI
```
```
```
java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO cust_info_hudi (
SELECT
data.id,
data.first_name,
data.middle_name,
data.last_name,
data.about_me,
data.phone_number,
data.age,
data.address,
data.street,
data.dist,
data.city,
data.state,
data.country
FROM
source_cust_info
)'.
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO cust_info_hudi (
SELECT
data.id,
data.first_name,
data.middle_name,
data.last_name,
data.about_me,
data.phone_number,
data.age,
data.address,
data.street,
data.dist,
data.city,
data.state,
data.country
FROM
source_cust_info
)'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
... 1 more
```
```
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382480482
@davidshtian would be hard to tell exact root cause
i think issue was with jar files Here is video for community
https://www.youtube.com/watch?v=8XS8egfrS_o&t=148s
however i am trying some other stuff out and im running into edge cases would you recommend opening new support or we should continue here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1381511651
@soumilshah1995 Great~ Kindly looking forward to the tutorial! Curious about what is the root cause for your case. Thanks~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1370164850
Here is details again i have tried again this morning
#### Please note This time i am on US-WEST-2 previously i was trying on US-EAST-1
#### Kinesis Streams
![image](https://user-images.githubusercontent.com/39345855/210422439-4c008e1e-ff85-421b-8fc5-75b8b635896d.png)
### Python Code to Dump Dummy Data
```
try:
import datetime
import json
import random
import boto3
import os
import uuid
import time
from faker import Faker
from dotenv import load_dotenv
load_dotenv(".env")
except Exception as e:
pass
global faker
faker = Faker()
def getReferrer():
data = {}
now = datetime.now()
str_now = now.isoformat()
data['uuid'] = str(uuid.uuid4())
data['event_time'] = str_now
data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
price = random.random() * 100
data['price'] = round(price, 2)
return data
while True:
data = json.dumps(getReferrer())
print(data)
global kinesis_client
kinesis_client = boto3.client('kinesis',
region_name=os.getenv("DEV_AWS_REGION_NAME"),
aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
)
res = kinesis_client.put_record(
StreamName="stock-streams",
Data=data,
PartitionKey="1")
time.sleep(3)
```
#### KDA
![image](https://user-images.githubusercontent.com/39345855/210422526-8e61f16e-c457-4aea-8a2f-c59ec03172f7.png)
#### Settings Added JAR Files
![image](https://user-images.githubusercontent.com/39345855/210422789-a6f08f42-2ad9-46c6-a8d9-b05ba150a6ba.png)
```
%flink.ssql(type=update)
DROP TABLE if exists stock_table;
CREATE TABLE stock_table (
uuid varchar,
ticker VARCHAR,
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = 'stock-streams',
'aws.region' = 'us-west-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
```
![image](https://user-images.githubusercontent.com/39345855/210428825-c54b6e4b-c262-409d-ae25-ccb134c0b011.png)
```
%flink.ssql(type=update)
DROP TABLE if exists stock_table_hudi;
CREATE TABLE stock_table_hudi(
uuid varchar,
ticker VARCHAR,
price DOUBLE,
event_time TIMESTAMP(3)
)
WITH (
'connector' = 'hudi',
'path' = 's3://soumil-dms-learn',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);
```
![image](https://user-images.githubusercontent.com/39345855/210429264-f131e2ab-14ee-47a9-beff-a98c8c598b3f.png)
# Real Time Data
![image](https://user-images.githubusercontent.com/39345855/210429752-6baaff85-eaf6-44b2-bb9e-cdf68b34ac06.png)
## Error Messages Same as above
```
ConnectException: Connection refused
java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time as ts from stock_table'.
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time as ts from stock_table'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$0(AbstractSessionClusterExecutor.java:83)
at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
... 5 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
... 1 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.217.181:8082
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.217.181:8082
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:829)
```
![image](https://user-images.githubusercontent.com/39345855/210430926-2589ee44-8403-4383-a1e6-81a21b14a063.png)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1368991339
Someone has posted a solution https://github.com/apache/hudi/issues/3707
i will try these custom JAR files and let you know if that works i will close this tickets
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1369400346
I see you use the streaming execution mode for `VALUES` SQL statement, did you try the batch execution mode instead then ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382693602
> @davidshtian what are recommend setting for querying data in Athena AKA Hive SYNC?
>
> ```
> %flink.ssql(type=update)
> DROP TABLE if exists orders;
> CREATE TABLE orders(
> orderid VARCHAR PRIMARY KEY NOT ENFORCED,
> customer_id VARCHAR,
> ts TIMESTAMP(3),
> order_value DOUBLE,
> priority VARCHAR
> )
> WITH (
> 'connector' = 'hudi',
> 'path' = 's3a://soumilshah-hudi-demos/tmp/',
> 'table.type' = 'COPY_ON_WRITE' ,
> 'hoodie.embed.timeline.server' = 'false',
> 'hive_sync.enable' = 'true',
> 'hive_sync.mode' = 'hms'
>
> );
> ```
>
> @davidshtian also curious why did you use %ssql instead of %flink.ssql(type=update)
>
> Also follow up Question at times KDA throws Error is that normal looks like its very unstable what are your honest feedback on that
@soumilshah1995 %ssql is also configured by default.
<img width="464" alt="image" src="https://user-images.githubusercontent.com/14228056/212458273-c02f5fc9-b508-4c88-8049-319ff998a0bb.png">
For Glue catalog sync, yes it should be using hive_sync but it is required to upload a new version of hudi-flink jar file (and others related), as [hudi docs](https://hudi.apache.org/docs/0.10.1/syncing_metastore) shows:
> hudi-flink-bundle module pom.xml sets the scope related to hive as provided by default. If you want to use hive sync, you need to use the profile flink-bundle-shade-hive during packaging.
```
'hive_sync.enable'='true',
'hive_sync.db'='<your db>',
'hive_sync.table' = '<your table>',
'hive_sync.mode' = 'hms',
```
I've tried it but failed to sync, not sure if it is due to this part:
<img width="832" alt="image" src="https://user-images.githubusercontent.com/14228056/212463692-f06859a6-3b3c-4794-b356-06209338808e.png">
And I used manual way to create the table for Athena.
Cool content on Youtube~ 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 closed issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 closed issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
URL: https://github.com/apache/hudi/issues/7591
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1370928644
Hey guys Good morning Any updates ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1371464600
Good Evening do we have any updates
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1586164159
> i am doing your project RDS -> DMS -> KINESI
Hey looks like jar or configuration issuer
can you follow steps here
https://github.com/soumilshah1995/aws-dms-kinesis-flink-hudi
https://github.com/soumilshah1995/dynamodb-flink-kinesis
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1374848788
# Detailed Steps i have followed that allows anyone to replicate
@nsivabalan @davidshtian @danny0405
# Step 1: Download and Upload the JAR into S3
• https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop/1.13.0
• https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.10.1/hudi-flink-bundle_2.12-0.10.1.jar
# Step 2: Create KDA Application
![image](https://user-images.githubusercontent.com/39345855/211200378-71b50563-4ca8-4c57-afbc-e30ee86ad9d1.png)
### Provide Name to KDA APP
![image](https://user-images.githubusercontent.com/39345855/211200437-1af10bac-b2dd-40e6-9686-7c41c2a092c7.png)
### Select the Glue database
![image](https://user-images.githubusercontent.com/39345855/211200482-49b07851-35fd-4260-9426-691b324becad.png)
#### NOTE i have given Admin Access there should not be problem with access
### Add Custom JAR we downloaded
![image](https://user-images.githubusercontent.com/39345855/211200540-58facbee-50ec-4ca3-abb8-3f6c5eec7fbd.png)
![image](https://user-images.githubusercontent.com/39345855/211200599-7a362e7c-b4f8-422d-a18d-62566fd0ed58.png)
![image](https://user-images.githubusercontent.com/39345855/211200607-f2ec629b-3adf-437b-bc6e-9807c696711f.png)
# Step 3: Launch Zeplin Notebooks
![image](https://user-images.githubusercontent.com/39345855/211200880-6eb1d210-adcb-4a58-98a2-d3e4fabcec4e.png)
# Step 4: Create a Kinesis Stream called stock-streams
![image](https://user-images.githubusercontent.com/39345855/211201204-f38b196e-6080-4831-aebb-72feb529ac38.png)
##### Add some data into streams
```
try:
import datetime
import json
import random
import boto3
import os
import uuid
import time
from faker import Faker
from dotenv import load_dotenv
load_dotenv(".env")
except Exception as e:
pass
global faker
faker = Faker()
def getReferrer():
data = {}
now = datetime.datetime.now()
str_now = now.isoformat()
data['uuid'] = str(uuid.uuid4())
data['event_time'] = str_now
data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
price = random.random() * 100
data['price'] = round(price, 2)
return data
while True:
data = json.dumps(getReferrer())
global kinesis_client
kinesis_client = boto3.client('kinesis',
region_name='us-west-2',
aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
aws_secret_access_key=os.getenv("DEV_SECRET_KEY")
)
res = kinesis_client.put_record(
StreamName="stock-streams",
Data=data,
PartitionKey="1")
print(data, " " , res)
time.sleep(2)
```
![image](https://user-images.githubusercontent.com/39345855/211201231-b92861c9-4fc0-480a-9312-f966d4c0d81f.png)
# Step 5: Lets go try hello world on HUDI
```
%flink.ssql(type=update)
DROP TABLE if exists stock_table;
CREATE TABLE stock_table (
uuid varchar,
ticker VARCHAR,
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = 'stock-streams',
'aws.region' = 'us-west-2',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
```
![image](https://user-images.githubusercontent.com/39345855/211201321-3c2366dc-ddf0-4487-a925-38922a942f43.png)
![image](https://user-images.githubusercontent.com/39345855/211201361-446728c9-833e-4708-9b9d-c39ea650392d.png)
```
%flink.ssql(type=update)
DROP TABLE if exists stock_table_hudi;
CREATE TABLE stock_table_hudi(
uuid varchar ,
ticker VARCHAR,
price DOUBLE,
event_time TIMESTAMP(3),
PRIMARY KEY (`uuid`) NOT Enforced
)
WITH (
'connector' = 'hudi',
'path' = 's3://soumilshah-hudi-demos/tmp/',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);
```
#### INSERT Into HUDI
```
%flink.ssql(type=update)
INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time from stock_table;
``
# Error Messages
```
java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time from stock_table'.
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time from stock_table'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
... 14 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
... 1 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
... 21 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/10.100.42.225:8082
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/10.100.42.225:8082
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:829)
```
please let me know if you guys need any other details i have given all steps which help to rectify the problem and solve it happy to hop on call or meeting to show you steps too Email shahsoumil519@gmail.com
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 closed issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 closed issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
URL: https://github.com/apache/hudi/issues/7591
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1376687802
@soumilshah1995 I tried again, it worked as below, for your reference. Thanks~
**Step 1 – Kinesis Stream**
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456528-eb52b665-05aa-4cca-a44f-30b0d1948ab8.png">
**Step 2 – Jars**
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456562-92dcd98e-9964-4db9-be71-5db6e9b3ef40.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456572-986a2a4a-8b2e-47bd-a205-72e265d359ae.png">
**Step 3 – KDA Studio**
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456602-8813ca24-4b81-4b41-bcf6-c4cf31f64556.png">
**Step 4 – Executing the code**
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456628-08f24462-54f1-453a-b46c-80c767752c25.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456638-d1f6d477-4ec2-4d6d-bcfd-4d53581e1ad4.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456650-7e0f8a22-83d0-456f-a309-2d46cadecd89.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456661-f2f95408-6bf8-4361-9e3d-d50974728b78.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456668-9a1ec0c1-bce5-4cbe-8607-2a98d4527741.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456684-dfb720af-1cd5-4da9-ac2a-12c28283d4ee.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456694-6b23055c-bba0-4077-b2de-bb21013448ce.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456708-bb8bc494-a0c8-438d-9d92-8e537023c6d3.png">
<img width="468" alt="image" src="https://user-images.githubusercontent.com/14228056/211456716-9c4825e7-05a7-4d35-a4ef-8f966d39056e.png">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1376707180
From the logs _java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082_, it seems that either the Flink cluster was not started at all, or it might be a network issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1369696915
Hey Danny
Yes I did try that yesterday I could not get it to work.
I keep getting this same error message
On Tue, Jan 3, 2023 at 12:08 AM Danny Chan ***@***.***> wrote:
> I see you use the streaming execution mode for VALUES SQL statement, did
> you try the batch execution mode instead then ?
>
> —
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/hudi/issues/7591#issuecomment-1369400346>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AJMF5P6ZHZ5RE7JPFL2FDTLWQOX3BANCNFSM6AAAAAATO3NBLA>
> .
> You are receiving this because you authored the thread.Message ID:
> ***@***.***>
>
--
Thanking You,
Soumil Nitin Shah
B.E in Electronic
MS Electrical Engineering
MS Computer Engineering
+1-646 204 5957
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1373625630
Hi danny i do not have logs i am using AWS kinesis managed service for Flink
i have provided all necessary steps if needed team can replicate the error.
please let me know if you need any further information
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375058501
Hello, @umehrot2 sorry to bother, is there any chance you can give some validations here ? It seems this customer is from AWS though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382490644
@davidshtian what are recommend setting for querying data in Athena AKA Hive SYNC?
```
%flink.ssql(type=update)
DROP TABLE if exists orders;
CREATE TABLE orders(
orderid VARCHAR PRIMARY KEY NOT ENFORCED,
customer_id VARCHAR,
ts TIMESTAMP(3),
order_value DOUBLE,
priority VARCHAR
)
WITH (
'connector' = 'hudi',
'path' = 's3a://soumilshah-hudi-demos/tmp/',
'table.type' = 'COPY_ON_WRITE' ,
'hoodie.embed.timeline.server' = 'false',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms'
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1380372534
im trying this today itself lets hope for best
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1382771434
You are right man
i used it manually to using DDL statement to create table in Athena
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375225426
Or lots of similar logs like this "RemoteTransportException: Connection unexpectedly closed by remote task manager '142.151.160.20/142.151.160.20:6121'. This might indicate that the remote task manager was lost.
This may indicate that the taskmanagers are overloaded."
```
"applicationARN": "arn:aws:kinesisanalytics:us-east-1:xxx:application/kda-studio-david",
"applicationVersionId": "19",
"locationInformation": "org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:163)",
"logger": "org.apache.zeppelin.flink.FlinkSqlInterrpeter",
"message": "Fail to run sql:INSERT INTO t1 SELECT uuid, event_time, ticker,price from stock_table",
"messageSchemaVersion": "1",
"messageType": "ERROR",
"threadName": "ParallelScheduler-Worker-9",
"throwableInformation": "java.io.IOException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 3e6e3e07934084f5fafc542a4a91bc2c)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744
at org.apache.zeppelin.scheduler.Job.run(Job.java:172
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628
at java.base/java.lang.Thread.run(Thread.java:829
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 3e6e3e07934084f5fafc542a4a91bc2c
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:125
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$24(RestClusterClient.java:670
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506
at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478
... 3 mor
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:123
... 23 mor
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrateg
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:222
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:718
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443
at jdk.internal.reflect.GeneratedMethodAccessor100.invoke(Unknown Source
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43
at java.base/java.lang.reflect.Method.invoke(Method.java:566
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172
at akka.actor.Actor.aroundReceive(Actor.scala:517
at akka.actor.Actor.aroundReceive$(Actor.scala:515
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592
at akka.actor.ActorCell.invoke(ActorCell.scala:561
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258
at akka.dispatch.Mailbox.run(Mailbox.scala:225
at akka.dispatch.Mailbox.exec(Mailbox.scala:235
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '142.151.160.20/142.151.160.20:6121'. This might indicate that the remote task manager was lost
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81
at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354
at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1106
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74
at java.base/java.lang.Thread.run(Thread.java:829)
"
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1381518382
@soumilshah1995
>i am going to work release a nice tutorial for community on Flink and hudi and hope
What a great news, welcome on board the Hudi community, there are so many nice guys that are smart and warm, I think you can make the community better and better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1381626839
@soumilshah1995 Is it related to s3 and s3a schema?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1380564963
Dear @davidshtian
I wanted to take a moment to thank you for the help you provided me. Your assistance was extremely valuable and I appreciate your willingness to lend a hand. Your support is greatly appreciated and I'm thankful for your help.
i was able to resolve this issue. i am going to work release a nice tutorial for community on Flink and hudi and hope that helps all
Thanks again for your help.
Sincerely,
shah
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danny0405 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
danny0405 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1373214655
I didn't see any error stack trace about hudi, it is hard to locate the cause of the problem, there is a `Connection refused` error thrown from Netty, maybe the TaskManager has been crashed, can you see the errors from the TM log file then ? Or JM log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1374699427
> @davidshtian
@soumilshah1995 Have you tried 1.13.2 version of the packege _flink-s3-fs-hadoop-1.13.2.jar_? As KDA [supports for Apache Flink version 1.13.2](https://docs.aws.amazon.com/kinesisanalytics/latest/java/doc-history.html), thanks~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
soumilshah1995 commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375695222
### Thank you @davidshtian here are my results based on feedback
* #I am now trying this with following changes
* flink-s3-fs-hadoop-1.13.2
* hudi-flink-bundle_2.12-0.10.1.jar
# Attempt 3
# Step 1: Made the Stream Again
![image](https://user-images.githubusercontent.com/39345855/211321796-b46d6b5f-d981-4e3e-b0f5-e27c07f3ae63.png)
# Step 2 Upload the Jar on S3 as mentioned by David
![image](https://user-images.githubusercontent.com/39345855/211322361-dcff2d61-0665-4021-ae70-86a2099e2ac6.png)
# Step 3 Creating KDA
![image](https://user-images.githubusercontent.com/39345855/211322739-d3b682d4-1348-4478-9589-f42c846e437b.png)
# Step 4 Executing Code
![image](https://user-images.githubusercontent.com/39345855/211327790-41c2bfcf-7de9-44bc-9e7c-35f95c9ebcde.png)
# Same Error
```
java.io.IOException: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time from stock_table where uuid is NOT NULL'.
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'INSERT INTO stock_table_hudi
SELECT uuid, ticker, price, event_time from stock_table where uuid is NOT NULL'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
at org.apache.flink.api.java.ScalaShellStreamEnvironment.executeAsync(ScalaShellStreamEnvironment.java:73)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1461)
at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
... 14 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
... 1 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
... 21 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1063)
... 19 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: zeppelin-flink/172.20.189.165:8082
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:829)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] davidshtian commented on issue #7591: [SUPPORT] Kinesis Data Analytics Flink1.13 to HUDI
Posted by GitBox <gi...@apache.org>.
davidshtian commented on issue #7591:
URL: https://github.com/apache/hudi/issues/7591#issuecomment-1375398513
@soumilshah1995 For your reference, in order to write data to S3 hudi sink, _checkpoint_ need to be enabled (restart the interpreter and execute).
```
%flink.conf
execution.checkpointing.interval 1000
```
And I also add below option to hudi table for disabling timeline server, otherwise errors like _"Caused by: org.apache.hudi.exception.HoodieRemoteException: Connect to 142.151.165.206:37851 [/142.151.165.206] failed: Connection timed out (Connection timed out)"_ will be thrown (based on actual tests).
```
'hoodie.embed.timeline.server' = 'false'
```
Thanks~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org