You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Christophe Salperwyck <ch...@gmail.com> on 2016/06/06 11:49:42 UTC

HBase Input Format for streaming

Hi all,

I am trying to read data from HBase and use the windows functions of Flink
streaming. I can read my data using the ExecutionEnvironment but not from
the StreamExecutionEnvironment.

Is that a known issue?

Are the inputsplits used in the streaming environment?

Here a sample of my code:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

@SuppressWarnings("serial")
final DataStreamSource<ANA> anaDS = env.createInput(new
TableInputFormat<ANA>() {
...
}

final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS.
assignTimestampsAndWatermarks(new xxxxAssignerWithPunctuatedWatermarks()).
keyBy(0).
timeWindow(Time.days(30), Time.days(30));

ws.sum(2).printToErr();
env.execute();

The error I get is:
Caused by: java.io.IOException: No table result scanner provided!
at
org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103)

It seems the "Result" is not read for a first time before calling this
function.

I built a "StreamingTableInputFormat" as a temporary work around but let me
know if there is something I did wrong.

Thanks for everything, Flink is great!

Cheers,
Christophe

Re: HBase Input Format for streaming

Posted by Christophe Salperwyck <ch...@gmail.com>.
I just did that:

public T nextRecord(final T reuse) throws IOException {
if (this.rs == null){
// throw new IOException("No table result scanner provided!");
return null;
}
...

because in the class FileSourceFunction we have:

 @Override public void run(SourceContext<OUT> ctx) throws Exception { while
(isRunning) { OUT nextElement = serializer.createInstance(); nextElement =
format.nextRecord(nextElement); if (nextElement == null &&
splitIterator.hasNext()) { format.open(splitIterator.next()); continue; }
else if (nextElement == null) { break; } ctx.collect(nextElement); } }

(I had to copy TableInputSplit as its constructor is not visible...)


2016-06-06 16:07 GMT+02:00 Ufuk Celebi <uc...@apache.org>:

> From the code it looks like the open method of the TableInputFormat is
> never called. What are you doing differently in the
> StreamingTableInputFormat?
>
> – Ufuk
>
>
> On Mon, Jun 6, 2016 at 1:49 PM, Christophe Salperwyck
> <ch...@gmail.com> wrote:
> > Hi all,
> >
> > I am trying to read data from HBase and use the windows functions of
> Flink
> > streaming. I can read my data using the ExecutionEnvironment but not from
> > the StreamExecutionEnvironment.
> >
> > Is that a known issue?
> >
> > Are the inputsplits used in the streaming environment?
> >
> > Here a sample of my code:
> >
> > final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> > @SuppressWarnings("serial")
> > final DataStreamSource<ANA> anaDS = env.createInput(new
> > TableInputFormat<ANA>() {
> > ...
> > }
> >
> > final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS.
> > assignTimestampsAndWatermarks(new
> xxxxAssignerWithPunctuatedWatermarks()).
> > keyBy(0).
> > timeWindow(Time.days(30), Time.days(30));
> >
> > ws.sum(2).printToErr();
> > env.execute();
> >
> > The error I get is:
> > Caused by: java.io.IOException: No table result scanner provided!
> > at
> >
> org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103)
> >
> > It seems the "Result" is not read for a first time before calling this
> > function.
> >
> > I built a "StreamingTableInputFormat" as a temporary work around but let
> me
> > know if there is something I did wrong.
> >
> > Thanks for everything, Flink is great!
> >
> > Cheers,
> > Christophe
>

Re: HBase Input Format for streaming

Posted by Ufuk Celebi <uc...@apache.org>.
From the code it looks like the open method of the TableInputFormat is
never called. What are you doing differently in the
StreamingTableInputFormat?

– Ufuk


On Mon, Jun 6, 2016 at 1:49 PM, Christophe Salperwyck
<ch...@gmail.com> wrote:
> Hi all,
>
> I am trying to read data from HBase and use the windows functions of Flink
> streaming. I can read my data using the ExecutionEnvironment but not from
> the StreamExecutionEnvironment.
>
> Is that a known issue?
>
> Are the inputsplits used in the streaming environment?
>
> Here a sample of my code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> @SuppressWarnings("serial")
> final DataStreamSource<ANA> anaDS = env.createInput(new
> TableInputFormat<ANA>() {
> ...
> }
>
> final WindowedStream<ANA, Tuple, TimeWindow> ws = anaDS.
> assignTimestampsAndWatermarks(new xxxxAssignerWithPunctuatedWatermarks()).
> keyBy(0).
> timeWindow(Time.days(30), Time.days(30));
>
> ws.sum(2).printToErr();
> env.execute();
>
> The error I get is:
> Caused by: java.io.IOException: No table result scanner provided!
> at
> org.apache.flink.addons.hbase.TableInputFormat.nextRecord(TableInputFormat.java:103)
>
> It seems the "Result" is not read for a first time before calling this
> function.
>
> I built a "StreamingTableInputFormat" as a temporary work around but let me
> know if there is something I did wrong.
>
> Thanks for everything, Flink is great!
>
> Cheers,
> Christophe