You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小昌同学 <cc...@163.com> on 2023/05/15 10:29:15 UTC
回复:报错显示为bug
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;
public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//使用侧输出流
OutputTag<BaseInfo> requestStream = new OutputTag<BaseInfo>("requestStream") {
};
OutputTag<BaseInfo> answerStream = new OutputTag<BaseInfo>("answerStream") {
};
//1、连接测试环境kafka的数据
String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName = FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBAAAAwACABYAAACuAgAAAAAAAAAAAACuAgAATQAAAAFIAAAAAAFSMAAAADEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator<BaseInfo> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo>() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");
String[] splits = datas.split("\n");
HashMap<String, String> dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});
//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator<BaseInfo> tagStream = baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));
//4、根据不同的tag进行不同的输出流设定
DataStream<BaseInfo> requestDataStream = tagStream.getSideOutput(requestStream);
DataStream<BaseInfo> answerDataStream = tagStream.getSideOutput(answerStream);
requestDataStream.print("requestDataStream");
answerDataStream.print("answerDataStream");
//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
//5.1 先对请求流进行处理
SingleOutputStreamOperator<OutInfo> outRequestDataStream = requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
String actionType = value.getFuncId();
System.out.println(actionType);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//根据数据的action去MySQL中查找到对应的中午注释
try {
String sql = "select action_name from ActionType where action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionType);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
// return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName,DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, LocalDateTime.now().toString());
}
});
outRequestDataStream.print("outRequestDataStream");
//5.2 对应答流进行处理
SingleOutputStreamOperator<OutInfo> outAnswerDataStream = answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
String actionType = value.getFuncId();
System.out.println(actionType);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//根据数据的action去MySQL中查找到对应的中午注释
try {
String sql = "select action_name from ActionType where action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionType);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
// return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, LocalDateTime.now().toString());
}
});
outAnswerDataStream.print("outAnswerDataStream");
//6、讲两条流转换为对应的表 进行关联取最小值
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//设置状态值为1天
tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
// tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
// tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder()
.column("funcId", DataTypes.STRING())
.column("serverIp", DataTypes.STRING())
.column("outTime", DataTypes.BIGINT())
.column("handleSerialNo", DataTypes.STRING())
.column("info", DataTypes.STRING())
.column("funcIdDesc", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
.build());
Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream,Schema.newBuilder()
.column("funcId", DataTypes.STRING())
.column("serverIp", DataTypes.STRING())
.column("outTime", DataTypes.BIGINT())
.column("handleSerialNo", DataTypes.STRING())
.column("info", DataTypes.STRING())
.column("funcIdDesc", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
.build());
Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.eventTime as eventTime\n" +
" from "+ tableRequest +" a\n " +
" inner join "+ tableAnswer +" b" +
" on a.handleSerialNo=b.handleSerialNo ");
System.out.println("这个是resultTable"+result);
result.printSchema();
tableEnv.createTemporaryView("resultTable", result);
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());
tableEnv.createTemporaryView("midTable",midTable);
//使用TVF的采用渐进式累计窗口进行计算
Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable "+
" , DESCRIPTOR(eventTime)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
DataStream<Tuple2<Boolean, ResultInfo>> resultStream = tableEnv.toRetractStream(resulTable, ResultInfo.class);
resultStream.print("resultStream");
SingleOutputStreamOperator<ResultInfo> resultInfoStream = resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
@Override
public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws Exception {
return value.f1;
}
});
resultInfoStream.print("resultInfoStream");
resultInfoStream.addSink(new Sink2Mysql());
env.execute();
}
}
|
你好,以上是我的代码,相关报错如下;
| 这个是resultTableUnnamedTable$2
(
`funcId` STRING,
`funcIdDesc` STRING,
`serverIp` STRING,
`maxTime` BIGINT,
`minTime` BIGINT,
`pk` STRING,
`eventTime` TIMESTAMP(3) *ROWTIME*
)
/* 1 */public class bean$OutInfo$2$Converter implements org.apache.flink.table.data.conversion.DataStructureConverter {
/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters;
/* 3 */ private final org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters;
/* 4 */ public bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters, org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters) {
/* 5 */ this.fieldGetters = fieldGetters;
/* 6 */ this.fieldConverters = fieldConverters;
/* 7 */ }
/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow = new org.apache.flink.table.data.GenericRowData(7);
/* 11 */ genericRow.setField(0, fieldConverters[0].toInternalOrNull(((java.lang.String) external.getFuncId())));
/* 12 */ genericRow.setField(1, fieldConverters[1].toInternalOrNull(((java.lang.String) external.getServerIp())));
/* 13 */ genericRow.setField(2, fieldConverters[2].toInternalOrNull(((java.lang.Long) external.getOutTime())));
/* 14 */ genericRow.setField(3, fieldConverters[3].toInternalOrNull(((java.lang.String) external.getHandleSerialNo())));
/* 15 */ genericRow.setField(4, fieldConverters[4].toInternalOrNull(((java.lang.String) external.getInfo())));
/* 16 */ genericRow.setField(5, fieldConverters[5].toInternalOrNull(((java.lang.String) external.getFuncIdDesc())));
/* 17 */ genericRow.setField(6, fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime) external.getEventTime())));
/* 18 */ return genericRow;
/* 19 */ }
/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
/* 21 */ final org.apache.flink.table.data.RowData internal = (org.apache.flink.table.data.RowData) o;
/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
/* 23 */ structured.setFuncId(((java.lang.String) fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
/* 24 */ structured.setServerIp(((java.lang.String) fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
/* 25 */ structured.setOutTime(((java.lang.Long) fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
/* 26 */ structured.setHandleSerialNo(((java.lang.String) fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
/* 27 */ structured.setInfo(((java.lang.String) fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
/* 28 */ structured.setFuncIdDesc(((java.lang.String) fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
/* 29 */ structured.setEventTime(((java.lang.String) fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
/* 30 */ return structured;
/* 31 */ }
/* 32 */}
/* 1 */public class bean$OutInfo$2$Converter implements org.apache.flink.table.data.conversion.DataStructureConverter {
/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters;
/* 3 */ private final org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters;
/* 4 */ public bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters, org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters) {
/* 5 */ this.fieldGetters = fieldGetters;
/* 6 */ this.fieldConverters = fieldConverters;
/* 7 */ }
/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow = new org.apache.flink.table.data.GenericRowData(7);
/* 11 */ genericRow.setField(0, fieldConverters[0].toInternalOrNull(((java.lang.String) external.getFuncId())));
/* 12 */ genericRow.setField(1, fieldConverters[1].toInternalOrNull(((java.lang.String) external.getServerIp())));
/* 13 */ genericRow.setField(2, fieldConverters[2].toInternalOrNull(((java.lang.Long) external.getOutTime())));
/* 14 */ genericRow.setField(3, fieldConverters[3].toInternalOrNull(((java.lang.String) external.getHandleSerialNo())));
/* 15 */ genericRow.setField(4, fieldConverters[4].toInternalOrNull(((java.lang.String) external.getInfo())));
/* 16 */ genericRow.setField(5, fieldConverters[5].toInternalOrNull(((java.lang.String) external.getFuncIdDesc())));
/* 17 */ genericRow.setField(6, fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime) external.getEventTime())));
/* 18 */ return genericRow;
/* 19 */ }
/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
/* 21 */ final org.apache.flink.table.data.RowData internal = (org.apache.flink.table.data.RowData) o;
/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
/* 23 */ structured.setFuncId(((java.lang.String) fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
/* 24 */ structured.setServerIp(((java.lang.String) fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
/* 25 */ structured.setOutTime(((java.lang.Long) fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
/* 26 */ structured.setHandleSerialNo(((java.lang.String) fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
/* 27 */ structured.setInfo(((java.lang.String) fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
/* 28 */ structured.setFuncIdDesc(((java.lang.String) fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
/* 29 */ structured.setEventTime(((java.lang.String) fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
/* 30 */ return structured;
/* 31 */ }
/* 32 */}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 4 more
Caused by: org.apache.flink.table.api.TableException: Error while generating structured type converter.
at org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
at org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.open(DataStructureConverterWrapper.java:46)
at org.apache.flink.table.runtime.operators.source.InputConversionOperator.open(InputConversionOperator.java:76)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
at org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
... 12 more
Caused by: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
... 13 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
... 16 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
... 22 more
Process finished with exit code 1
|
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | lxk<lx...@163.com> |
| 发送日期 | 2023年5月15日 18:21 |
| 收件人 | <us...@flink.apache.org> |
| 主题 | Re:报错显示为bug |
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。
在 2023-05-15 17:11:42,"小昌同学" <cc...@163.com> 写道:
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. “
flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作
| |
小昌同学
|
|
ccc0606fighting@163.com
|
回复: 回复:报错显示为bug
Posted by 小昌同学 <cc...@163.com>.
好滴呀 谢谢各位老师
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | Shammon FY<zj...@gmail.com> |
| 发送日期 | 2023年5月16日 08:46 |
| 收件人 | <us...@flink.apache.org> ,
<cc...@163.com> |
| 主题 | Re: 回复:报错显示为bug |
Hi,
从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段
Best,
Shammon FY
On Mon, May 15, 2023 at 7:29 PM lxk <lx...@163.com> wrote:
你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换
At 2023-05-15 18:29:15, "小昌同学" <cc...@163.com> wrote:
|
package job;
import bean.BaseInfo;
import bean.MidInfo;
import bean.OutInfo;
import bean.ResultInfo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import config.FlinkConfig;
import function.MyProcessFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.OutputTag;
import sink.Sink2Mysql;
import utils.DateUtil;
import utils.DateUtils;
import utils.JdbcUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Properties;
public class RytLogAnly4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//使用侧输出流
OutputTag<BaseInfo> requestStream = new
OutputTag<BaseInfo>("requestStream") {
};
OutputTag<BaseInfo> answerStream = new
OutputTag<BaseInfo>("answerStream") {
};
//1、连接测试环境kafka的数据
String servers =
FlinkConfig.config.getProperty("dev_bootstrap.servers");
String topicName =
FlinkConfig.config.getProperty("dev_topicName");
String groupId = FlinkConfig.config.getProperty("dev_groupId");
String devMode = FlinkConfig.config.getProperty("dev_mode");
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", servers);
prop.setProperty("group.id", groupId);
prop.setProperty("auto.offset.reset", devMode);
DataStreamSource<String> sourceStream = env.addSource(new
FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
<315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBAAAAwACABYAAACuAgAAAAAAAAAAAACuAgAATQAAAAFIAAAAAAFSMAAAADEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
//2、对源数据进行处理,生成baseInfo基类的数据
SingleOutputStreamOperator<BaseInfo> baseInfoStream =
sourceStream.map(new MapFunction<String, BaseInfo>() {
@Override
public BaseInfo map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
//获取到不同的服务器IP
String serverIp = jsonObject.getString("ip");
//获取到不同的data的数据
String datas = jsonObject.getString("data");
String[] splits = datas.split("\n");
HashMap<String, String> dataMap = new HashMap<>();
//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
String time = splits[0].substring(7, 19);
//将subData填充到自定义类型中,用来判断时请求还是应答
String subData = datas.substring(0, 10);
for (int i = 0; i < splits.length; i++) {
if (splits[i].contains("=")) {
splits[i] = splits[i].replaceFirst("=", "&");
String[] temp = splits[i].split("&");
if (temp.length > 1) {
dataMap.put(temp[0].toLowerCase(), temp[1]);
}
}
}
return new BaseInfo(dataMap.get("action"), serverIp,
DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
}
});
//3、使用process方法进行baseInfoStream流切割
SingleOutputStreamOperator<BaseInfo> tagStream =
baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));
//4、根据不同的tag进行不同的输出流设定
DataStream<BaseInfo> requestDataStream =
tagStream.getSideOutput(requestStream);
DataStream<BaseInfo> answerDataStream =
tagStream.getSideOutput(answerStream);
requestDataStream.print("requestDataStream");
answerDataStream.print("answerDataStream");
//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
//5.1 先对请求流进行处理
SingleOutputStreamOperator<OutInfo> outRequestDataStream =
requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
String actionType = value.getFuncId();
System.out.println(actionType);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//根据数据的action去MySQL中查找到对应的中午注释
try {
String sql = "select action_name from ActionType
where action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionType);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
// return new OutInfo(value.getFuncId(),
value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
value.getInfo(), actionName,DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(),
value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
value.getInfo(), actionName, LocalDateTime.now().toString());
}
});
outRequestDataStream.print("outRequestDataStream");
//5.2 对应答流进行处理
SingleOutputStreamOperator<OutInfo> outAnswerDataStream =
answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
@Override
public OutInfo map(BaseInfo value) throws Exception {
//拿到数据中携带的数字的action
String actionType = value.getFuncId();
System.out.println(actionType);
String actionName = null;
Connection connection = null;
PreparedStatement ps = null;
//根据数据的action去MySQL中查找到对应的中午注释
try {
String sql = "select action_name from ActionType
where action = ?";
connection = JdbcUtil.getConnection();
ps = connection.prepareStatement(sql);
ps.setString(1, actionType);
ResultSet resultSet = ps.executeQuery();
System.out.println("resultSet是" + resultSet);
if (resultSet.next()) {
actionName = resultSet.getString("action_name");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
JdbcUtil.closeResource(connection, ps);
}
// return new OutInfo(value.getFuncId(),
value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
value.getInfo(), actionName, DateUtils.format(new Date()));
return new OutInfo(value.getFuncId(),
value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
value.getInfo(), actionName, LocalDateTime.now().toString());
}
});
outAnswerDataStream.print("outAnswerDataStream");
//6、讲两条流转换为对应的表 进行关联取最小值
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
//设置状态值为1天
tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
// tableEnv.createTemporaryView("tableRequest",
outRequestDataStream);
// tableEnv.createTemporaryView("tableAnswer",
outAnswerDataStream);
Table tableRequest =
tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder()
.column("funcId", DataTypes.STRING())
.column("serverIp", DataTypes.STRING())
.column("outTime", DataTypes.BIGINT())
.column("handleSerialNo", DataTypes.STRING())
.column("info", DataTypes.STRING())
.column("funcIdDesc", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.watermark("eventTime", "eventTime - INTERVAL '5' SECOND
")
.build());
Table tableAnswer =
tableEnv.fromDataStream(outAnswerDataStream,Schema.newBuilder()
.column("funcId", DataTypes.STRING())
.column("serverIp", DataTypes.STRING())
.column("outTime", DataTypes.BIGINT())
.column("handleSerialNo", DataTypes.STRING())
.column("info", DataTypes.STRING())
.column("funcIdDesc", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.watermark("eventTime", "eventTime - INTERVAL '5' SECOND
")
.build());
Table result = tableEnv.sqlQuery("select \n" +
"\ta.funcId as funcId ,\n" +
"\ta.funcIdDesc as funcIdDesc,\n" +
"\ta.serverIp as serverIp,\n" +
"\tb.outTime as maxTime,\n" +
"\ta.outTime as minTime,\t\n" +
"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
" a.eventTime as eventTime\n" +
" from "+ tableRequest +" a\n " +
" inner join "+ tableAnswer +" b" +
" on a.handleSerialNo=b.handleSerialNo ");
System.out.println("这个是resultTable"+result);
result.printSchema();
tableEnv.createTemporaryView("resultTable", result);
DataStream<MidInfo> midStream = tableEnv.toAppendStream(result,
MidInfo.class);
Table midTable = tableEnv.fromDataStream(midStream, $("funcId"),
$("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"),
$("eventTime").rowtime());
tableEnv.createTemporaryView("midTable",midTable);
//使用TVF的采用渐进式累计窗口进行计算
Table resulTable = tableEnv.sqlQuery("SELECT
funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
"FROM TABLE(CUMULATE(\n" +
" TABLE midTable "+
" , DESCRIPTOR(eventTime)\n" +
" , INTERVAL '60' SECOND\n" +
" , INTERVAL '1' DAY))\n" +
" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
DataStream<Tuple2<Boolean, ResultInfo>> resultStream =
tableEnv.toRetractStream(resulTable, ResultInfo.class);
resultStream.print("resultStream");
SingleOutputStreamOperator<ResultInfo> resultInfoStream =
resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>()
{
@Override
public ResultInfo map(Tuple2<Boolean, ResultInfo> value)
throws Exception {
return value.f1;
}
});
resultInfoStream.print("resultInfoStream");
resultInfoStream.addSink(new Sink2Mysql());
env.execute();
}
}
|
你好,以上是我的代码,相关报错如下;
| 这个是resultTableUnnamedTable$2
(
`funcId` STRING,
`funcIdDesc` STRING,
`serverIp` STRING,
`maxTime` BIGINT,
`minTime` BIGINT,
`pk` STRING,
`eventTime` TIMESTAMP(3) *ROWTIME*
)
/* 1 */public class bean$OutInfo$2$Converter implements
org.apache.flink.table.data.conversion.DataStructureConverter {
/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[]
fieldGetters;
/* 3 */ private final
org.apache.flink.table.data.conversion.DataStructureConverter[]
fieldConverters;
/* 4 */ public
bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[]
fieldGetters,
org.apache.flink.table.data.conversion.DataStructureConverter[]
fieldConverters) {
/* 5 */ this.fieldGetters = fieldGetters;
/* 6 */ this.fieldConverters = fieldConverters;
/* 7 */ }
/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow =
new org.apache.flink.table.data.GenericRowData(7);
/* 11 */ genericRow.setField(0,
fieldConverters[0].toInternalOrNull(((java.lang.String)
external.getFuncId())));
/* 12 */ genericRow.setField(1,
fieldConverters[1].toInternalOrNull(((java.lang.String)
external.getServerIp())));
/* 13 */ genericRow.setField(2,
fieldConverters[2].toInternalOrNull(((java.lang.Long)
external.getOutTime())));
/* 14 */ genericRow.setField(3,
fieldConverters[3].toInternalOrNull(((java.lang.String)
external.getHandleSerialNo())));
/* 15 */ genericRow.setField(4,
fieldConverters[4].toInternalOrNull(((java.lang.String)
external.getInfo())));
/* 16 */ genericRow.setField(5,
fieldConverters[5].toInternalOrNull(((java.lang.String)
external.getFuncIdDesc())));
/* 17 */ genericRow.setField(6,
fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime)
external.getEventTime())));
/* 18 */ return genericRow;
/* 19 */ }
/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
/* 21 */ final org.apache.flink.table.data.RowData internal =
(org.apache.flink.table.data.RowData) o;
/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
/* 23 */ structured.setFuncId(((java.lang.String)
fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
/* 24 */ structured.setServerIp(((java.lang.String)
fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
/* 25 */ structured.setOutTime(((java.lang.Long)
fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
/* 26 */ structured.setHandleSerialNo(((java.lang.String)
fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
/* 27 */ structured.setInfo(((java.lang.String)
fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
/* 28 */ structured.setFuncIdDesc(((java.lang.String)
fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
/* 29 */ structured.setEventTime(((java.lang.String)
fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
/* 30 */ return structured;
/* 31 */ }
/* 32 */}
/* 1 */public class bean$OutInfo$2$Converter implements
org.apache.flink.table.data.conversion.DataStructureConverter {
/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[]
fieldGetters;
/* 3 */ private final
org.apache.flink.table.data.conversion.DataStructureConverter[]
fieldConverters;
/* 4 */ public
bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[]
fieldGetters,
org.apache.flink.table.data.conversion.DataStructureConverter[]
fieldConverters) {
/* 5 */ this.fieldGetters = fieldGetters;
/* 6 */ this.fieldConverters = fieldConverters;
/* 7 */ }
/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow =
new org.apache.flink.table.data.GenericRowData(7);
/* 11 */ genericRow.setField(0,
fieldConverters[0].toInternalOrNull(((java.lang.String)
external.getFuncId())));
/* 12 */ genericRow.setField(1,
fieldConverters[1].toInternalOrNull(((java.lang.String)
external.getServerIp())));
/* 13 */ genericRow.setField(2,
fieldConverters[2].toInternalOrNull(((java.lang.Long)
external.getOutTime())));
/* 14 */ genericRow.setField(3,
fieldConverters[3].toInternalOrNull(((java.lang.String)
external.getHandleSerialNo())));
/* 15 */ genericRow.setField(4,
fieldConverters[4].toInternalOrNull(((java.lang.String)
external.getInfo())));
/* 16 */ genericRow.setField(5,
fieldConverters[5].toInternalOrNull(((java.lang.String)
external.getFuncIdDesc())));
/* 17 */ genericRow.setField(6,
fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime)
external.getEventTime())));
/* 18 */ return genericRow;
/* 19 */ }
/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
/* 21 */ final org.apache.flink.table.data.RowData internal =
(org.apache.flink.table.data.RowData) o;
/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
/* 23 */ structured.setFuncId(((java.lang.String)
fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
/* 24 */ structured.setServerIp(((java.lang.String)
fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
/* 25 */ structured.setOutTime(((java.lang.Long)
fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
/* 26 */ structured.setHandleSerialNo(((java.lang.String)
fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
/* 27 */ structured.setInfo(((java.lang.String)
fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
/* 28 */ structured.setFuncIdDesc(((java.lang.String)
fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
/* 29 */ structured.setEventTime(((java.lang.String)
fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
/* 30 */ return structured;
/* 31 */ }
/* 32 */}
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
by NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 4 more
Caused by: org.apache.flink.table.api.TableException: Error while
generating structured type converter.
at
org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
at
org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.open(DataStructureConverterWrapper.java:46)
at
org.apache.flink.table.runtime.operators.source.InputConversionOperator.open(InputConversionOperator.java:76)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
at
org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
... 12 more
Caused by:
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.
at
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
at
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
at
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
at
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
... 13 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table
program cannot be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
at
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
at
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
at
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
... 16 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
at
org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
at
org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
at
org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
at
org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
at
org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
at
org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
... 22 more
Process finished with exit code 1
|
| |
小昌同学
|
|
ccc0606fighting@163.com
|
---- 回复的原邮件 ----
| 发件人 | lxk<lx...@163.com> |
| 发送日期 | 2023年5月15日 18:21 |
| 收件人 | <us...@flink.apache.org> |
| 主题 | Re:报错显示为bug |
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。
在 2023-05-15 17:11:42,"小昌同学" <cc...@163.com> 写道:
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue. “
flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作
| |
小昌同学
|
|
ccc0606fighting@163.com
|
Re: 回复:报错显示为bug
Posted by Shammon FY <zj...@gmail.com>.
Hi,
从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段
Best,
Shammon FY
On Mon, May 15, 2023 at 7:29 PM lxk <lx...@163.com> wrote:
> 你好,从报错来看是类型不兼容导致的。
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
> 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
> 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-05-15 18:29:15, "小昌同学" <cc...@163.com> wrote:
> >|
> >package job;
> >import bean.BaseInfo;
> >import bean.MidInfo;
> >import bean.OutInfo;
> >import bean.ResultInfo;
> >import com.alibaba.fastjson.JSON;
> >import com.alibaba.fastjson.JSONObject;
> >import config.FlinkConfig;
> >import function.MyProcessFunction;
> >import org.apache.flink.api.common.functions.MapFunction;
> >import org.apache.flink.api.common.serialization.SimpleStringSchema;
> >import org.apache.flink.api.java.tuple.Tuple2;
> >import org.apache.flink.streaming.api.TimeCharacteristic;
> >import org.apache.flink.streaming.api.datastream.DataStream;
> >import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> >import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> >import org.apache.flink.table.api.DataTypes;
> >import org.apache.flink.table.api.Schema;
> >import org.apache.flink.table.api.Table;
> >import org.apache.flink.table.api.TableSchema;
> >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> >import org.apache.flink.table.types.DataType;
> >import org.apache.flink.util.OutputTag;
> >import sink.Sink2Mysql;
> >import utils.DateUtil;
> >import utils.DateUtils;
> >import utils.JdbcUtil;
> >
> >import java.sql.Connection;
> >import java.sql.PreparedStatement;
> >import java.sql.ResultSet;
> >import java.time.*;
> >import java.util.Date;
> >import java.util.HashMap;
> >import java.util.Properties;
> >
> >public class RytLogAnly4 {
> >public static void main(String[] args) throws Exception {
> > StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >//使用侧输出流
> > OutputTag<BaseInfo> requestStream = new
> OutputTag<BaseInfo>("requestStream") {
> > };
> > OutputTag<BaseInfo> answerStream = new
> OutputTag<BaseInfo>("answerStream") {
> > };
> >
> >//1、连接测试环境kafka的数据
> > String servers =
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
> > String topicName =
> FlinkConfig.config.getProperty("dev_topicName");
> > String groupId = FlinkConfig.config.getProperty("dev_groupId");
> > String devMode = FlinkConfig.config.getProperty("dev_mode");
> > Properties prop = new Properties();
> > prop.setProperty("bootstrap.servers", servers);
> > prop.setProperty("group.id", groupId);
> > prop.setProperty("auto.offset.reset", devMode);
> > DataStreamSource<String> sourceStream = env.addSource(new
> FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
> >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
> <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBAAAAwACABYAAACuAgAAAAAAAAAAAACuAgAATQAAAAFIAAAAAAFSMAAAADEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
> >
> > //2、对源数据进行处理,生成baseInfo基类的数据
> > SingleOutputStreamOperator<BaseInfo> baseInfoStream =
> sourceStream.map(new MapFunction<String, BaseInfo>() {
> >@Override
> > public BaseInfo map(String value) throws Exception {
> > JSONObject jsonObject = JSON.parseObject(value);
> >//获取到不同的服务器IP
> > String serverIp = jsonObject.getString("ip");
> >//获取到不同的data的数据
> > String datas = jsonObject.getString("data");
> >
> > String[] splits = datas.split("\n");
> > HashMap<String, String> dataMap = new HashMap<>();
> >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
> > String time = splits[0].substring(7, 19);
> >//将subData填充到自定义类型中,用来判断时请求还是应答
> > String subData = datas.substring(0, 10);
> >for (int i = 0; i < splits.length; i++) {
> >if (splits[i].contains("=")) {
> > splits[i] = splits[i].replaceFirst("=", "&");
> > String[] temp = splits[i].split("&");
> >if (temp.length > 1) {
> > dataMap.put(temp[0].toLowerCase(), temp[1]);
> > }
> > }
> > }
> >return new BaseInfo(dataMap.get("action"), serverIp,
> DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
> > }
> > });
> >
> >//3、使用process方法进行baseInfoStream流切割
> > SingleOutputStreamOperator<BaseInfo> tagStream =
> baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));
> >
> >//4、根据不同的tag进行不同的输出流设定
> > DataStream<BaseInfo> requestDataStream =
> tagStream.getSideOutput(requestStream);
> > DataStream<BaseInfo> answerDataStream =
> tagStream.getSideOutput(answerStream);
> >
> > requestDataStream.print("requestDataStream");
> > answerDataStream.print("answerDataStream");
> >
> >//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
> > //5.1 先对请求流进行处理
> > SingleOutputStreamOperator<OutInfo> outRequestDataStream =
> requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
> >@Override
> > public OutInfo map(BaseInfo value) throws Exception {
> >//拿到数据中携带的数字的action
> > String actionType = value.getFuncId();
> > System.out.println(actionType);
> > String actionName = null;
> > Connection connection = null;
> > PreparedStatement ps = null;
> >
> >//根据数据的action去MySQL中查找到对应的中午注释
> > try {
> > String sql = "select action_name from ActionType
> where action = ?";
> > connection = JdbcUtil.getConnection();
> > ps = connection.prepareStatement(sql);
> > ps.setString(1, actionType);
> > ResultSet resultSet = ps.executeQuery();
> > System.out.println("resultSet是" + resultSet);
> >if (resultSet.next()) {
> > actionName = resultSet.getString("action_name");
> > }
> > } catch (Exception e) {
> >throw new RuntimeException(e);
> > } finally {
> > JdbcUtil.closeResource(connection, ps);
> > }
> >// return new OutInfo(value.getFuncId(),
> value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
> value.getInfo(), actionName,DateUtils.format(new Date()));
> > return new OutInfo(value.getFuncId(),
> value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
> value.getInfo(), actionName, LocalDateTime.now().toString());
> > }
> > });
> > outRequestDataStream.print("outRequestDataStream");
> >
> >
> >//5.2 对应答流进行处理
> > SingleOutputStreamOperator<OutInfo> outAnswerDataStream =
> answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
> >@Override
> > public OutInfo map(BaseInfo value) throws Exception {
> >//拿到数据中携带的数字的action
> > String actionType = value.getFuncId();
> > System.out.println(actionType);
> > String actionName = null;
> > Connection connection = null;
> > PreparedStatement ps = null;
> >
> >//根据数据的action去MySQL中查找到对应的中午注释
> > try {
> > String sql = "select action_name from ActionType
> where action = ?";
> > connection = JdbcUtil.getConnection();
> > ps = connection.prepareStatement(sql);
> > ps.setString(1, actionType);
> > ResultSet resultSet = ps.executeQuery();
> > System.out.println("resultSet是" + resultSet);
> >if (resultSet.next()) {
> > actionName = resultSet.getString("action_name");
> > }
> > } catch (Exception e) {
> >throw new RuntimeException(e);
> > } finally {
> > JdbcUtil.closeResource(connection, ps);
> > }
> >// return new OutInfo(value.getFuncId(),
> value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
> value.getInfo(), actionName, DateUtils.format(new Date()));
> > return new OutInfo(value.getFuncId(),
> value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(),
> value.getInfo(), actionName, LocalDateTime.now().toString());
> > }
> > });
> > outAnswerDataStream.print("outAnswerDataStream");
> >
> >//6、讲两条流转换为对应的表 进行关联取最小值
> > StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> >//设置状态值为1天
> > tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
> >// tableEnv.createTemporaryView("tableRequest",
> outRequestDataStream);
> >// tableEnv.createTemporaryView("tableAnswer",
> outAnswerDataStream);
> > Table tableRequest =
> tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder()
> > .column("funcId", DataTypes.STRING())
> > .column("serverIp", DataTypes.STRING())
> > .column("outTime", DataTypes.BIGINT())
> > .column("handleSerialNo", DataTypes.STRING())
> > .column("info", DataTypes.STRING())
> > .column("funcIdDesc", DataTypes.STRING())
> > .column("eventTime", DataTypes.TIMESTAMP(3))
> > .watermark("eventTime", "eventTime - INTERVAL '5' SECOND
> ")
> > .build());
> > Table tableAnswer =
> tableEnv.fromDataStream(outAnswerDataStream,Schema.newBuilder()
> > .column("funcId", DataTypes.STRING())
> > .column("serverIp", DataTypes.STRING())
> > .column("outTime", DataTypes.BIGINT())
> > .column("handleSerialNo", DataTypes.STRING())
> > .column("info", DataTypes.STRING())
> > .column("funcIdDesc", DataTypes.STRING())
> > .column("eventTime", DataTypes.TIMESTAMP(3))
> > .watermark("eventTime", "eventTime - INTERVAL '5' SECOND
> ")
> > .build());
> >
> >
> > Table result = tableEnv.sqlQuery("select \n" +
> >"\ta.funcId as funcId ,\n" +
> >"\ta.funcIdDesc as funcIdDesc,\n" +
> >"\ta.serverIp as serverIp,\n" +
> >"\tb.outTime as maxTime,\n" +
> >"\ta.outTime as minTime,\t\n" +
> >"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
> >" a.eventTime as eventTime\n" +
> >" from "+ tableRequest +" a\n " +
> >" inner join "+ tableAnswer +" b" +
> >" on a.handleSerialNo=b.handleSerialNo ");
> > System.out.println("这个是resultTable"+result);
> > result.printSchema();
> > tableEnv.createTemporaryView("resultTable", result);
> >
> > DataStream<MidInfo> midStream = tableEnv.toAppendStream(result,
> MidInfo.class);
> > Table midTable = tableEnv.fromDataStream(midStream, $("funcId"),
> $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"),
> $("eventTime").rowtime());
> >
> > tableEnv.createTemporaryView("midTable",midTable);
> >
> >//使用TVF的采用渐进式累计窗口进行计算
> > Table resulTable = tableEnv.sqlQuery("SELECT
> funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
> >"FROM TABLE(CUMULATE(\n" +
> >" TABLE midTable "+
> >" , DESCRIPTOR(eventTime)\n" +
> >" , INTERVAL '60' SECOND\n" +
> >" , INTERVAL '1' DAY))\n" +
> >" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
> >
> >
> >
> > DataStream<Tuple2<Boolean, ResultInfo>> resultStream =
> tableEnv.toRetractStream(resulTable, ResultInfo.class);
> > resultStream.print("resultStream");
> > SingleOutputStreamOperator<ResultInfo> resultInfoStream =
> resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>()
> {
> >@Override
> > public ResultInfo map(Tuple2<Boolean, ResultInfo> value)
> throws Exception {
> >return value.f1;
> > }
> > });
> > resultInfoStream.print("resultInfoStream");
> > resultInfoStream.addSink(new Sink2Mysql());
> > env.execute();
> > }
> >}
> >|
> >你好,以上是我的代码,相关报错如下;
> >
> >
> >| 这个是resultTableUnnamedTable$2
> >(
> > `funcId` STRING,
> > `funcIdDesc` STRING,
> > `serverIp` STRING,
> > `maxTime` BIGINT,
> > `minTime` BIGINT,
> > `pk` STRING,
> > `eventTime` TIMESTAMP(3) *ROWTIME*
> >)
> >/* 1 */public class bean$OutInfo$2$Converter implements
> org.apache.flink.table.data.conversion.DataStructureConverter {
> >/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[]
> fieldGetters;
> >/* 3 */ private final
> org.apache.flink.table.data.conversion.DataStructureConverter[]
> fieldConverters;
> >/* 4 */ public
> bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[]
> fieldGetters,
> org.apache.flink.table.data.conversion.DataStructureConverter[]
> fieldConverters) {
> >/* 5 */ this.fieldGetters = fieldGetters;
> >/* 6 */ this.fieldConverters = fieldConverters;
> >/* 7 */ }
> >/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
> >/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
> >/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow =
> new org.apache.flink.table.data.GenericRowData(7);
> >/* 11 */ genericRow.setField(0,
> fieldConverters[0].toInternalOrNull(((java.lang.String)
> external.getFuncId())));
> >/* 12 */ genericRow.setField(1,
> fieldConverters[1].toInternalOrNull(((java.lang.String)
> external.getServerIp())));
> >/* 13 */ genericRow.setField(2,
> fieldConverters[2].toInternalOrNull(((java.lang.Long)
> external.getOutTime())));
> >/* 14 */ genericRow.setField(3,
> fieldConverters[3].toInternalOrNull(((java.lang.String)
> external.getHandleSerialNo())));
> >/* 15 */ genericRow.setField(4,
> fieldConverters[4].toInternalOrNull(((java.lang.String)
> external.getInfo())));
> >/* 16 */ genericRow.setField(5,
> fieldConverters[5].toInternalOrNull(((java.lang.String)
> external.getFuncIdDesc())));
> >/* 17 */ genericRow.setField(6,
> fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime)
> external.getEventTime())));
> >/* 18 */ return genericRow;
> >/* 19 */ }
> >/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
> >/* 21 */ final org.apache.flink.table.data.RowData internal =
> (org.apache.flink.table.data.RowData) o;
> >/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
> >/* 23 */ structured.setFuncId(((java.lang.String)
> fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
> >/* 24 */ structured.setServerIp(((java.lang.String)
> fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
> >/* 25 */ structured.setOutTime(((java.lang.Long)
> fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
> >/* 26 */ structured.setHandleSerialNo(((java.lang.String)
> fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
> >/* 27 */ structured.setInfo(((java.lang.String)
> fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
> >/* 28 */ structured.setFuncIdDesc(((java.lang.String)
> fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
> >/* 29 */ structured.setEventTime(((java.lang.String)
> fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
> >/* 30 */ return structured;
> >/* 31 */ }
> >/* 32 */}
> >
> >/* 1 */public class bean$OutInfo$2$Converter implements
> org.apache.flink.table.data.conversion.DataStructureConverter {
> >/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[]
> fieldGetters;
> >/* 3 */ private final
> org.apache.flink.table.data.conversion.DataStructureConverter[]
> fieldConverters;
> >/* 4 */ public
> bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[]
> fieldGetters,
> org.apache.flink.table.data.conversion.DataStructureConverter[]
> fieldConverters) {
> >/* 5 */ this.fieldGetters = fieldGetters;
> >/* 6 */ this.fieldConverters = fieldConverters;
> >/* 7 */ }
> >/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
> >/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
> >/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow =
> new org.apache.flink.table.data.GenericRowData(7);
> >/* 11 */ genericRow.setField(0,
> fieldConverters[0].toInternalOrNull(((java.lang.String)
> external.getFuncId())));
> >/* 12 */ genericRow.setField(1,
> fieldConverters[1].toInternalOrNull(((java.lang.String)
> external.getServerIp())));
> >/* 13 */ genericRow.setField(2,
> fieldConverters[2].toInternalOrNull(((java.lang.Long)
> external.getOutTime())));
> >/* 14 */ genericRow.setField(3,
> fieldConverters[3].toInternalOrNull(((java.lang.String)
> external.getHandleSerialNo())));
> >/* 15 */ genericRow.setField(4,
> fieldConverters[4].toInternalOrNull(((java.lang.String)
> external.getInfo())));
> >/* 16 */ genericRow.setField(5,
> fieldConverters[5].toInternalOrNull(((java.lang.String)
> external.getFuncIdDesc())));
> >/* 17 */ genericRow.setField(6,
> fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime)
> external.getEventTime())));
> >/* 18 */ return genericRow;
> >/* 19 */ }
> >/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
> >/* 21 */ final org.apache.flink.table.data.RowData internal =
> (org.apache.flink.table.data.RowData) o;
> >/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
> >/* 23 */ structured.setFuncId(((java.lang.String)
> fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
> >/* 24 */ structured.setServerIp(((java.lang.String)
> fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
> >/* 25 */ structured.setOutTime(((java.lang.Long)
> fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
> >/* 26 */ structured.setHandleSerialNo(((java.lang.String)
> fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
> >/* 27 */ structured.setInfo(((java.lang.String)
> fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
> >/* 28 */ structured.setFuncIdDesc(((java.lang.String)
> fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
> >/* 29 */ structured.setEventTime(((java.lang.String)
> fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
> >/* 30 */ return structured;
> >/* 31 */ }
> >/* 32 */}
> >
> >Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> > at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> > at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
> > at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> > at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> > at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> > at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> > at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
> > at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> > at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> > at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> > at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> > at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
> > at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> > at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> > at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> > at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> > at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> > at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> > at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> > at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> > at akka.dispatch.OnComplete.internal(Future.scala:300)
> > at akka.dispatch.OnComplete.internal(Future.scala:297)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> > at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> > at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> > at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> > at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> > at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> > at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
> > at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> > at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> > at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> > at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> > at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> > at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> > at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> > at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> > at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> >Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> > at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> > at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> > at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
> > at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
> > at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
> > at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
> > at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
> > at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
> > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> > at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
> > at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> > at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
> > at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> > at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> > at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at akka.actor.Actor.aroundReceive(Actor.scala:537)
> > at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> > ... 4 more
> >Caused by: org.apache.flink.table.api.TableException: Error while
> generating structured type converter.
> > at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
> > at
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.open(DataStructureConverterWrapper.java:46)
> > at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.open(InputConversionOperator.java:76)
> > at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> > at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> > at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> > at java.lang.Thread.run(Thread.java:748)
> >Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> > at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
> > at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
> > ... 12 more
> >Caused by:
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> > at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
> > at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
> > at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
> > at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
> > ... 13 more
> >Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> > at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
> > at
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
> > at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
> > at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
> > at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
> > at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
> > at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
> > ... 16 more
> >Caused by: org.codehaus.commons.compiler.CompileException: Line 17,
> Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
> > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
> > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
> > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
> > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
> > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
> > at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
> > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
> > at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
> > at
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
> > at
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
> > at
> org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
> > at
> org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
> > at
> org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
> > at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
> > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> > at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
> > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> > at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
> > at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
> > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> > at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
> > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> > at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
> > at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
> > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> > at
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
> > at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> > at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
> > at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
> > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
> > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> > at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> > at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
> > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> > at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> > at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> > at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
> > ... 22 more
> >
> >Process finished with exit code 1
> >
> >|
> >
> >
> >| |
> >小昌同学
> >|
> >|
> >ccc0606fighting@163.com
> >|
> >---- 回复的原邮件 ----
> >| 发件人 | lxk<lx...@163.com> |
> >| 发送日期 | 2023年5月15日 18:21 |
> >| 收件人 | <us...@flink.apache.org> |
> >| 主题 | Re:报错显示为bug |
> >你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2023-05-15 17:11:42,"小昌同学" <cc...@163.com> 写道:
> >各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue. “
> >flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作
> >
> >
> >| |
> >小昌同学
> >|
> >|
> >ccc0606fighting@163.com
> >|
>
Re:回复:报错显示为bug
Posted by lxk <lx...@163.com>.
你好,从报错来看是类型不兼容导致的。
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换
At 2023-05-15 18:29:15, "小昌同学" <cc...@163.com> wrote:
>|
>package job;
>import bean.BaseInfo;
>import bean.MidInfo;
>import bean.OutInfo;
>import bean.ResultInfo;
>import com.alibaba.fastjson.JSON;
>import com.alibaba.fastjson.JSONObject;
>import config.FlinkConfig;
>import function.MyProcessFunction;
>import org.apache.flink.api.common.functions.MapFunction;
>import org.apache.flink.api.common.serialization.SimpleStringSchema;
>import org.apache.flink.api.java.tuple.Tuple2;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import org.apache.flink.streaming.api.datastream.DataStream;
>import org.apache.flink.streaming.api.datastream.DataStreamSource;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>import org.apache.flink.table.api.DataTypes;
>import org.apache.flink.table.api.Schema;
>import org.apache.flink.table.api.Table;
>import org.apache.flink.table.api.TableSchema;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.types.DataType;
>import org.apache.flink.util.OutputTag;
>import sink.Sink2Mysql;
>import utils.DateUtil;
>import utils.DateUtils;
>import utils.JdbcUtil;
>
>import java.sql.Connection;
>import java.sql.PreparedStatement;
>import java.sql.ResultSet;
>import java.time.*;
>import java.util.Date;
>import java.util.HashMap;
>import java.util.Properties;
>
>public class RytLogAnly4 {
>public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>//使用侧输出流
> OutputTag<BaseInfo> requestStream = new OutputTag<BaseInfo>("requestStream") {
> };
> OutputTag<BaseInfo> answerStream = new OutputTag<BaseInfo>("answerStream") {
> };
>
>//1、连接测试环境kafka的数据
> String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers");
> String topicName = FlinkConfig.config.getProperty("dev_topicName");
> String groupId = FlinkConfig.config.getProperty("dev_groupId");
> String devMode = FlinkConfig.config.getProperty("dev_mode");
> Properties prop = new Properties();
> prop.setProperty("bootstrap.servers", servers);
> prop.setProperty("group.id", groupId);
> prop.setProperty("auto.offset.reset", devMode);
> DataStreamSource<String> sourceStream = env.addSource(new FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop));
>//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBAAAAwACABYAAACuAgAAAAAAAAAAAACuAgAATQAAAAFIAAAAAAFSMAAAADEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
>
> //2、对源数据进行处理,生成baseInfo基类的数据
> SingleOutputStreamOperator<BaseInfo> baseInfoStream = sourceStream.map(new MapFunction<String, BaseInfo>() {
>@Override
> public BaseInfo map(String value) throws Exception {
> JSONObject jsonObject = JSON.parseObject(value);
>//获取到不同的服务器IP
> String serverIp = jsonObject.getString("ip");
>//获取到不同的data的数据
> String datas = jsonObject.getString("data");
>
> String[] splits = datas.split("\n");
> HashMap<String, String> dataMap = new HashMap<>();
>//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间
> String time = splits[0].substring(7, 19);
>//将subData填充到自定义类型中,用来判断时请求还是应答
> String subData = datas.substring(0, 10);
>for (int i = 0; i < splits.length; i++) {
>if (splits[i].contains("=")) {
> splits[i] = splits[i].replaceFirst("=", "&");
> String[] temp = splits[i].split("&");
>if (temp.length > 1) {
> dataMap.put(temp[0].toLowerCase(), temp[1]);
> }
> }
> }
>return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData);
> }
> });
>
>//3、使用process方法进行baseInfoStream流切割
> SingleOutputStreamOperator<BaseInfo> tagStream = baseInfoStream.process(new MyProcessFunction(requestStream, answerStream));
>
>//4、根据不同的tag进行不同的输出流设定
> DataStream<BaseInfo> requestDataStream = tagStream.getSideOutput(requestStream);
> DataStream<BaseInfo> answerDataStream = tagStream.getSideOutput(answerStream);
>
> requestDataStream.print("requestDataStream");
> answerDataStream.print("answerDataStream");
>
>//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表
> //5.1 先对请求流进行处理
> SingleOutputStreamOperator<OutInfo> outRequestDataStream = requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
>@Override
> public OutInfo map(BaseInfo value) throws Exception {
>//拿到数据中携带的数字的action
> String actionType = value.getFuncId();
> System.out.println(actionType);
> String actionName = null;
> Connection connection = null;
> PreparedStatement ps = null;
>
>//根据数据的action去MySQL中查找到对应的中午注释
> try {
> String sql = "select action_name from ActionType where action = ?";
> connection = JdbcUtil.getConnection();
> ps = connection.prepareStatement(sql);
> ps.setString(1, actionType);
> ResultSet resultSet = ps.executeQuery();
> System.out.println("resultSet是" + resultSet);
>if (resultSet.next()) {
> actionName = resultSet.getString("action_name");
> }
> } catch (Exception e) {
>throw new RuntimeException(e);
> } finally {
> JdbcUtil.closeResource(connection, ps);
> }
>// return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName,DateUtils.format(new Date()));
> return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, LocalDateTime.now().toString());
> }
> });
> outRequestDataStream.print("outRequestDataStream");
>
>
>//5.2 对应答流进行处理
> SingleOutputStreamOperator<OutInfo> outAnswerDataStream = answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() {
>@Override
> public OutInfo map(BaseInfo value) throws Exception {
>//拿到数据中携带的数字的action
> String actionType = value.getFuncId();
> System.out.println(actionType);
> String actionName = null;
> Connection connection = null;
> PreparedStatement ps = null;
>
>//根据数据的action去MySQL中查找到对应的中午注释
> try {
> String sql = "select action_name from ActionType where action = ?";
> connection = JdbcUtil.getConnection();
> ps = connection.prepareStatement(sql);
> ps.setString(1, actionType);
> ResultSet resultSet = ps.executeQuery();
> System.out.println("resultSet是" + resultSet);
>if (resultSet.next()) {
> actionName = resultSet.getString("action_name");
> }
> } catch (Exception e) {
>throw new RuntimeException(e);
> } finally {
> JdbcUtil.closeResource(connection, ps);
> }
>// return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, DateUtils.format(new Date()));
> return new OutInfo(value.getFuncId(), value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), actionName, LocalDateTime.now().toString());
> }
> });
> outAnswerDataStream.print("outAnswerDataStream");
>
>//6、讲两条流转换为对应的表 进行关联取最小值
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>//设置状态值为1天
> tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));
>// tableEnv.createTemporaryView("tableRequest", outRequestDataStream);
>// tableEnv.createTemporaryView("tableAnswer", outAnswerDataStream);
> Table tableRequest = tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder()
> .column("funcId", DataTypes.STRING())
> .column("serverIp", DataTypes.STRING())
> .column("outTime", DataTypes.BIGINT())
> .column("handleSerialNo", DataTypes.STRING())
> .column("info", DataTypes.STRING())
> .column("funcIdDesc", DataTypes.STRING())
> .column("eventTime", DataTypes.TIMESTAMP(3))
> .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
> .build());
> Table tableAnswer = tableEnv.fromDataStream(outAnswerDataStream,Schema.newBuilder()
> .column("funcId", DataTypes.STRING())
> .column("serverIp", DataTypes.STRING())
> .column("outTime", DataTypes.BIGINT())
> .column("handleSerialNo", DataTypes.STRING())
> .column("info", DataTypes.STRING())
> .column("funcIdDesc", DataTypes.STRING())
> .column("eventTime", DataTypes.TIMESTAMP(3))
> .watermark("eventTime", "eventTime - INTERVAL '5' SECOND ")
> .build());
>
>
> Table result = tableEnv.sqlQuery("select \n" +
>"\ta.funcId as funcId ,\n" +
>"\ta.funcIdDesc as funcIdDesc,\n" +
>"\ta.serverIp as serverIp,\n" +
>"\tb.outTime as maxTime,\n" +
>"\ta.outTime as minTime,\t\n" +
>"\tconcat(a.funcId,a.serverIp) as pk ,\n" +
>" a.eventTime as eventTime\n" +
>" from "+ tableRequest +" a\n " +
>" inner join "+ tableAnswer +" b" +
>" on a.handleSerialNo=b.handleSerialNo ");
> System.out.println("这个是resultTable"+result);
> result.printSchema();
> tableEnv.createTemporaryView("resultTable", result);
>
> DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, MidInfo.class);
> Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), $("eventTime").rowtime());
>
> tableEnv.createTemporaryView("midTable",midTable);
>
>//使用TVF的采用渐进式累计窗口进行计算
> Table resulTable = tableEnv.sqlQuery("SELECT funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" +
>"FROM TABLE(CUMULATE(\n" +
>" TABLE midTable "+
>" , DESCRIPTOR(eventTime)\n" +
>" , INTERVAL '60' SECOND\n" +
>" , INTERVAL '1' DAY))\n" +
>" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk");
>
>
>
> DataStream<Tuple2<Boolean, ResultInfo>> resultStream = tableEnv.toRetractStream(resulTable, ResultInfo.class);
> resultStream.print("resultStream");
> SingleOutputStreamOperator<ResultInfo> resultInfoStream = resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() {
>@Override
> public ResultInfo map(Tuple2<Boolean, ResultInfo> value) throws Exception {
>return value.f1;
> }
> });
> resultInfoStream.print("resultInfoStream");
> resultInfoStream.addSink(new Sink2Mysql());
> env.execute();
> }
>}
>|
>你好,以上是我的代码,相关报错如下;
>
>
>| 这个是resultTableUnnamedTable$2
>(
> `funcId` STRING,
> `funcIdDesc` STRING,
> `serverIp` STRING,
> `maxTime` BIGINT,
> `minTime` BIGINT,
> `pk` STRING,
> `eventTime` TIMESTAMP(3) *ROWTIME*
>)
>/* 1 */public class bean$OutInfo$2$Converter implements org.apache.flink.table.data.conversion.DataStructureConverter {
>/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters;
>/* 3 */ private final org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters;
>/* 4 */ public bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters, org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters) {
>/* 5 */ this.fieldGetters = fieldGetters;
>/* 6 */ this.fieldConverters = fieldConverters;
>/* 7 */ }
>/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
>/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
>/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow = new org.apache.flink.table.data.GenericRowData(7);
>/* 11 */ genericRow.setField(0, fieldConverters[0].toInternalOrNull(((java.lang.String) external.getFuncId())));
>/* 12 */ genericRow.setField(1, fieldConverters[1].toInternalOrNull(((java.lang.String) external.getServerIp())));
>/* 13 */ genericRow.setField(2, fieldConverters[2].toInternalOrNull(((java.lang.Long) external.getOutTime())));
>/* 14 */ genericRow.setField(3, fieldConverters[3].toInternalOrNull(((java.lang.String) external.getHandleSerialNo())));
>/* 15 */ genericRow.setField(4, fieldConverters[4].toInternalOrNull(((java.lang.String) external.getInfo())));
>/* 16 */ genericRow.setField(5, fieldConverters[5].toInternalOrNull(((java.lang.String) external.getFuncIdDesc())));
>/* 17 */ genericRow.setField(6, fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime) external.getEventTime())));
>/* 18 */ return genericRow;
>/* 19 */ }
>/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
>/* 21 */ final org.apache.flink.table.data.RowData internal = (org.apache.flink.table.data.RowData) o;
>/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
>/* 23 */ structured.setFuncId(((java.lang.String) fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
>/* 24 */ structured.setServerIp(((java.lang.String) fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
>/* 25 */ structured.setOutTime(((java.lang.Long) fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
>/* 26 */ structured.setHandleSerialNo(((java.lang.String) fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
>/* 27 */ structured.setInfo(((java.lang.String) fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
>/* 28 */ structured.setFuncIdDesc(((java.lang.String) fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
>/* 29 */ structured.setEventTime(((java.lang.String) fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
>/* 30 */ return structured;
>/* 31 */ }
>/* 32 */}
>
>/* 1 */public class bean$OutInfo$2$Converter implements org.apache.flink.table.data.conversion.DataStructureConverter {
>/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters;
>/* 3 */ private final org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters;
>/* 4 */ public bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters, org.apache.flink.table.data.conversion.DataStructureConverter[] fieldConverters) {
>/* 5 */ this.fieldGetters = fieldGetters;
>/* 6 */ this.fieldConverters = fieldConverters;
>/* 7 */ }
>/* 8 */ public java.lang.Object toInternal(java.lang.Object o) {
>/* 9 */ final bean.OutInfo external = (bean.OutInfo) o;
>/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow = new org.apache.flink.table.data.GenericRowData(7);
>/* 11 */ genericRow.setField(0, fieldConverters[0].toInternalOrNull(((java.lang.String) external.getFuncId())));
>/* 12 */ genericRow.setField(1, fieldConverters[1].toInternalOrNull(((java.lang.String) external.getServerIp())));
>/* 13 */ genericRow.setField(2, fieldConverters[2].toInternalOrNull(((java.lang.Long) external.getOutTime())));
>/* 14 */ genericRow.setField(3, fieldConverters[3].toInternalOrNull(((java.lang.String) external.getHandleSerialNo())));
>/* 15 */ genericRow.setField(4, fieldConverters[4].toInternalOrNull(((java.lang.String) external.getInfo())));
>/* 16 */ genericRow.setField(5, fieldConverters[5].toInternalOrNull(((java.lang.String) external.getFuncIdDesc())));
>/* 17 */ genericRow.setField(6, fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime) external.getEventTime())));
>/* 18 */ return genericRow;
>/* 19 */ }
>/* 20 */ public java.lang.Object toExternal(java.lang.Object o) {
>/* 21 */ final org.apache.flink.table.data.RowData internal = (org.apache.flink.table.data.RowData) o;
>/* 22 */ final bean.OutInfo structured = new bean.OutInfo();
>/* 23 */ structured.setFuncId(((java.lang.String) fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
>/* 24 */ structured.setServerIp(((java.lang.String) fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
>/* 25 */ structured.setOutTime(((java.lang.Long) fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal))));
>/* 26 */ structured.setHandleSerialNo(((java.lang.String) fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal))));
>/* 27 */ structured.setInfo(((java.lang.String) fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal))));
>/* 28 */ structured.setFuncIdDesc(((java.lang.String) fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal))));
>/* 29 */ structured.setEventTime(((java.lang.String) fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal))));
>/* 30 */ return structured;
>/* 31 */ }
>/* 32 */}
>
>Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
> at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> at akka.dispatch.OnComplete.internal(Future.scala:300)
> at akka.dispatch.OnComplete.internal(Future.scala:297)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
> at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
> at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
> at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
> at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> ... 4 more
>Caused by: org.apache.flink.table.api.TableException: Error while generating structured type converter.
> at org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
> at org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.open(DataStructureConverterWrapper.java:46)
> at org.apache.flink.table.runtime.operators.source.InputConversionOperator.open(InputConversionOperator.java:76)
> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
>Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
> at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
> at org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
> ... 12 more
>Caused by: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
> at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
> at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
> at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
> at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
> ... 13 more
>Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
> at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
> at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
> at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
> at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
> at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
> at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
> at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
> ... 16 more
>Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
> at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
> at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
> at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
> at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
> at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
> at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
> at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
> ... 22 more
>
>Process finished with exit code 1
>
>|
>
>
>| |
>小昌同学
>|
>|
>ccc0606fighting@163.com
>|
>---- 回复的原邮件 ----
>| 发件人 | lxk<lx...@163.com> |
>| 发送日期 | 2023年5月15日 18:21 |
>| 收件人 | <us...@flink.apache.org> |
>| 主题 | Re:报错显示为bug |
>你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2023-05-15 17:11:42,"小昌同学" <cc...@163.com> 写道:
>各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. “
>flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作
>
>
>| |
>小昌同学
>|
>|
>ccc0606fighting@163.com
>|