You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jun su <su...@gmail.com> on 2020/04/24 06:05:36 UTC
Blink模式下运用collect方法快速获取结果
hi all,
blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
结果用于代码调试么?
--
Best,
Jun Su
Re: Blink模式下运用collect方法快速获取结果
Posted by jun su <su...@gmail.com>.
非常感谢, 我用的flink-1.9.2 , 但是直接将代码copy过来可以用了!
Jingsong Li <ji...@gmail.com> 于2020年4月24日周五 下午3:02写道:
> 1.10里面有TableUtils了,里面有collectToList
>
>
> Best,
> Jingsong Lee
>
> On Fri, Apr 24, 2020 at 2:49 PM jun su <su...@gmail.com> wrote:
>
> > hi all,
> >
> > 找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
> > 但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:
> >
> > def collect[T](
> > tEnv: TableEnvironment,
> > table: Table,
> > sink: CollectTableSink[T],
> > jobName: Option[String]): Seq[T] = {
> > val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
> > .asInstanceOf[TypeInformation[T]]
> > .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
> > .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
> > val id = new AbstractID().toString
> > sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
> > val sinkName = UUID.randomUUID().toString
> > tEnv.registerTableSink(sinkName, sink)
> > tEnv.insertInto(table, sinkName)
> >
> > val res = tEnv.execute("test")
> > val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
> > SerializedListAccumulator.deserializeList(accResult, typeSerializer)
> > }
> >
> >
> > jun su <su...@gmail.com> 于2020年4月24日周五 下午2:05写道:
> >
> > > hi all,
> > >
> > > blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> > > 结果用于代码调试么?
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best, Jingsong Lee
>
--
Best,
Jun Su
Re: Blink模式下运用collect方法快速获取结果
Posted by Jingsong Li <ji...@gmail.com>.
1.10里面有TableUtils了,里面有collectToList
Best,
Jingsong Lee
On Fri, Apr 24, 2020 at 2:49 PM jun su <su...@gmail.com> wrote:
> hi all,
>
> 找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
> 但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:
>
> def collect[T](
> tEnv: TableEnvironment,
> table: Table,
> sink: CollectTableSink[T],
> jobName: Option[String]): Seq[T] = {
> val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
> .asInstanceOf[TypeInformation[T]]
> .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
> .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
> val id = new AbstractID().toString
> sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
> val sinkName = UUID.randomUUID().toString
> tEnv.registerTableSink(sinkName, sink)
> tEnv.insertInto(table, sinkName)
>
> val res = tEnv.execute("test")
> val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
> SerializedListAccumulator.deserializeList(accResult, typeSerializer)
> }
>
>
> jun su <su...@gmail.com> 于2020年4月24日周五 下午2:05写道:
>
> > hi all,
> >
> > blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> > 结果用于代码调试么?
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best,
> Jun Su
>
--
Best, Jingsong Lee
Re: Blink模式下运用collect方法快速获取结果
Posted by jun su <su...@gmail.com>.
hi all,
找到了源码中BatchTableEnvUtil类使用了CollectTableSink来做collect的逻辑,
但是下方代码运用了源码内部的private方法, 看起来不允许外部调用:
def collect[T](
tEnv: TableEnvironment,
table: Table,
sink: CollectTableSink[T],
jobName: Option[String]): Seq[T] = {
val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
.createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
.getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
val id = new AbstractID().toString
sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
val sinkName = UUID.randomUUID().toString
tEnv.registerTableSink(sinkName, sink)
tEnv.insertInto(table, sinkName)
val res = tEnv.execute("test")
val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
SerializedListAccumulator.deserializeList(accResult, typeSerializer)
}
jun su <su...@gmail.com> 于2020年4月24日周五 下午2:05写道:
> hi all,
>
> blink模式下没法将table 转为 dataset , 所以如果直接collect了, 请问有类似方法可以获取到
> 结果用于代码调试么?
>
> --
> Best,
> Jun Su
>
--
Best,
Jun Su