You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Feng Zhu <we...@gmail.com> on 2020/01/02 03:55:06 UTC
Re: Understanding the Window Enumerable Process
Hi, Shawn
I was hoping someone could give me a high level overview of how Calcite is
> executing the window function.
>
Calcite runtime generates Java code and complies the code with Janino. For
Window Enumerable Process,
there is a sample piece of code in
*org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
which may be what you need.
In general, input rows will be firstly organized into a*
org.apache.calcite.runtime.SortedMultiMap* according to partitioning key,
and then all of the windowed aggregate functions will be evaluated for each
list of rows that have the same partitioning key.
I would expect it to have to iterate through the data at least once for the
> window and once for the data itself.
>
Could you elaborate?
Best,
Feng
Shawn Weeks <sw...@weeksconsulting.us> 于2020年1月1日周三 上午1:23写道:
> Hi, I’m trying to troubleshoot an issue with the Apache NiFi projects use
> of Calcite to allow queries against flow files. NiFi presents an enumerable
> interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults. For
> queries with simple where clause and aggregations everything works great
> but when I use window analytic functions like row_number against larger
> sets of records(2gb) the processes uses all memory allocated and every cpu
> core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in live
> bytes and most of the CPU usage is NiFi parsing the records.
>
> I was hoping someone could give me a high level overview of how Calcite is
> executing the window function. I would expect it to have to iterate through
> the data at least once for the window and once for the data itself.
>
> Here is the query being executed and a trace of the Calcite execution plan.
>
> select "antiNucleus",
> "eventFile",
> "eventNumber",
> "eventTime",
> "histFile",
> "multiplicity",
> "NaboveLb",
> "NbelowLb",
> "NLb",
> "primaryTracks",
> "prodTime",
> "Pt",
> "runNumber",
> "vertexX",
> "vertexY",
> "vertexZ",
> row_number() over() id
> from flowfile;
>
> 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - Root:
> rel#10:Subset#1.ENUMERABLE.[]
> Original rel:
> LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]], antiNucleus=[$0],
> eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
> multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
> primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
> vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER (ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
> FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
> table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
> 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0
> io}, id = 0
>
> Sets:
> Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> JavaType(class java.lang.String) eventFile, JavaType(class
> java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
> JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
> multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
> java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> JavaType(class java.lang.String) primaryTracks, JavaType(class
> java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> JavaType(class java.lang.String) runNumber, JavaType(class
> java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> JavaType(class java.lang.String) vertexZ)
> rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
>
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
> cost={100.0 rows, 101.0 cpu, 0.0 io}
> Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> JavaType(class java.lang.String) eventFile, JavaType(class
> java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
> JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
> multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
> java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> JavaType(class java.lang.String) primaryTracks, JavaType(class
> java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> JavaType(class java.lang.String) runNumber, JavaType(class
> java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> JavaType(class java.lang.String) vertexZ, BIGINT id)
> rel#8:Subset#1.NONE.[], best=null, importance=0.9
>
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), rowcount=100.0,
> cumulative cost={inf}
>
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
> rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
>
> rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
> rowcount=100.0, cumulative cost={inf}
>
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
> 0.0 io}
>
>
> 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> EnumerableWindow#16
> 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> Cheapest plan:
> EnumerableWindow(window#0=[window(partition {} order by [] rows between
> UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount =
> 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
> FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7,
> 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost = {100.0
> rows, 101.0 cpu, 0.0 io}, id = 0
>
> 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> Provenance:
> EnumerableWindow#16
> direct
>
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()]))
> call#112 rule [EnumerableWindowRule]
>
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()]))
> call#64 rule [ProjectToWindowRule:project]
>
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
> no parent
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
> no parent
>
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> HepRelVertex#17
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> EnumerableWindow#18
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> HepRelVertex#19
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> Breadth-first from root: {
> HepRelVertex#19 =
> rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
> 0.0 io}
> HepRelVertex#17 =
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
> cost={100.0 rows, 101.0 cpu, 0.0 io}
>
Re: Understanding the Window Enumerable Process
Posted by Feng Zhu <we...@gmail.com>.
The memory issue may be related with the implementation.
For row_number() function, the runtime processing logic is demonstrated as
below.
It can be seen that too many data structures (i.e., list/array/map) will be
created.
Best,
Feng
=====================================================================================================
*// 1.Loading data set into a List*
final java.util.List tempList = (java.util.List)
org.apache.calcite.linq4j.Linq4j.asEnumerable(***your data set***).into(new
java.util.ArrayList());
*// 2.Sorting the data set (Note: a new map&Array&Iterator will be
constructed, see SortedMultiMap)*
final java.util.Comparator comparator = new java.util.Comparator(){...};
final java.util.Iterator iterator =
org.apache.calcite.runtime.SortedMultiMap.singletonArrayIterator(comparator,
tempList);
*// 3.Constructing a new ArrayList for result*
final java.util.ArrayList _list = new
java.util.ArrayList(tempList.size());
*// 4. Computing row_number()*
Long a0w0 = (Long) null; *// for row_number*
while (iterator.hasNext()) {
final Object[] _rows = (Object[]) iterator.next();
for (int i = 0; i < _rows.length; ++i) {
final org.apache.calcite.test.JdbcTest.Employee row =
(org.apache.calcite.test.JdbcTest.Employee) _rows[i];
......
_list.add(new Object[] {
...... // row columns
a0w0});
}
}
tempList.clear();
final org.apache.calcite.linq4j.Enumerable _inputEnumerable =
org.apache.calcite.linq4j.Linq4j.asEnumerable(_list);
return new org.apache.calcite.linq4j.AbstractEnumerable(){
public org.apache.calcite.linq4j.Enumerator enumerator() {
return new org.apache.calcite.linq4j.Enumerator(){
public final org.apache.calcite.linq4j.Enumerator
inputEnumerator = _inputEnumerable.enumerator();
......
public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
return new Object[] { ... // row columns
};
}
};
}
};
Shawn Weeks <sw...@weeksconsulting.us> 于2020年1月3日周五 上午12:16写道:
> In my example I'm not setting a partition key as I'm trying to generate
> just a sequential list of integers. It sounds like everything is held in
> memory in a SortedMultiMap which explains the high memory usage though I'm
> not sure why it's using several times the memory as the input record set.
> My test file is a 2gb CSV and I'll hit out of memory with an '-Xmx16g' on
> the process.
>
> My last question was to try and figure out how many times Calcite would
> have to iterate over the data to produce a result as when I give it large
> amounts of memory it does eventually finish it just takes substantially
> longer than the time to iterate through the data once. For example a simple
> select * might take 2-3 minutes for my test file but the row_number version
> takes 30+ minutes if it ever finishes.
>
> Just in case I missed it, Calcite doesn't have a simple counter function
> like Oracles rownum function does it?
>
> Thanks
> Shawn
>
> On 1/1/20, 9:55 PM, "Feng Zhu" <we...@gmail.com> wrote:
>
> Hi, Shawn
>
> I was hoping someone could give me a high level overview of how
> Calcite is
> > executing the window function.
> >
> Calcite runtime generates Java code and complies the code with Janino.
> For
> Window Enumerable Process,
> there is a sample piece of code in
>
> *org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
> which may be what you need.
>
> In general, input rows will be firstly organized into a*
> org.apache.calcite.runtime.SortedMultiMap* according to partitioning
> key,
> and then all of the windowed aggregate functions will be evaluated for
> each
> list of rows that have the same partitioning key.
>
> I would expect it to have to iterate through the data at least once
> for the
> > window and once for the data itself.
> >
> Could you elaborate?
>
> Best,
> Feng
>
> Shawn Weeks <sw...@weeksconsulting.us> 于2020年1月1日周三 上午1:23写道:
>
> > Hi, I’m trying to troubleshoot an issue with the Apache NiFi
> projects use
> > of Calcite to allow queries against flow files. NiFi presents an
> enumerable
> > interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults.
> For
> > queries with simple where clause and aggregations everything works
> great
> > but when I use window analytic functions like row_number against
> larger
> > sets of records(2gb) the processes uses all memory allocated and
> every cpu
> > core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in
> live
> > bytes and most of the CPU usage is NiFi parsing the records.
> >
> > I was hoping someone could give me a high level overview of how
> Calcite is
> > executing the window function. I would expect it to have to iterate
> through
> > the data at least once for the window and once for the data itself.
> >
> > Here is the query being executed and a trace of the Calcite
> execution plan.
> >
> > select "antiNucleus",
> > "eventFile",
> > "eventNumber",
> > "eventTime",
> > "histFile",
> > "multiplicity",
> > "NaboveLb",
> > "NbelowLb",
> > "NLb",
> > "primaryTracks",
> > "prodTime",
> > "Pt",
> > "runNumber",
> > "vertexX",
> > "vertexY",
> > "vertexZ",
> > row_number() over() id
> > from flowfile;
> >
> > 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> Root:
> > rel#10:Subset#1.ENUMERABLE.[]
> > Original rel:
> > LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]],
> antiNucleus=[$0],
> > eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
> > multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
> > primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
> > vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER
> (ROWS
> > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
> > cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
> > FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
> > table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
> 12, 13,
> > 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0
> cpu, 0.0
> > io}, id = 0
> >
> > Sets:
> > Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> > JavaType(class java.lang.String) eventFile, JavaType(class
> > java.lang.String) eventNumber, JavaType(class java.lang.String)
> eventTime,
> > JavaType(class java.lang.String) histFile, JavaType(class
> java.lang.String)
> > multiplicity, JavaType(class java.lang.String) NaboveLb,
> JavaType(class
> > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> > JavaType(class java.lang.String) primaryTracks, JavaType(class
> > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> > JavaType(class java.lang.String) runNumber, JavaType(class
> > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> > JavaType(class java.lang.String) vertexZ)
> > rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
> >
> > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
> > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0,
> cumulative
> > cost={100.0 rows, 101.0 cpu, 0.0 io}
> > Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> > JavaType(class java.lang.String) eventFile, JavaType(class
> > java.lang.String) eventNumber, JavaType(class java.lang.String)
> eventTime,
> > JavaType(class java.lang.String) histFile, JavaType(class
> java.lang.String)
> > multiplicity, JavaType(class java.lang.String) NaboveLb,
> JavaType(class
> > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> > JavaType(class java.lang.String) primaryTracks, JavaType(class
> > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> > JavaType(class java.lang.String) runNumber, JavaType(class
> > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> > JavaType(class java.lang.String) vertexZ, BIGINT id)
> > rel#8:Subset#1.NONE.[], best=null, importance=0.9
> >
> >
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)),
> rowcount=100.0,
> > cumulative cost={inf}
> >
> >
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
> > rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
> >
> >
> rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
> > rowcount=100.0, cumulative cost={inf}
> >
> >
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0
> cpu,
> > 0.0 io}
> >
> >
> > 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
> > EnumerableWindow#16
> > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> > Cheapest plan:
> > EnumerableWindow(window#0=[window(partition {} order by [] rows
> between
> > UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount
> =
> > 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
> > FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5,
> 6, 7,
> > 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost =
> {100.0
> > rows, 101.0 cpu, 0.0 io}, id = 0
> >
> > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> > Provenance:
> > EnumerableWindow#16
> > direct
> >
> >
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()]))
> > call#112 rule [EnumerableWindowRule]
> >
> >
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()]))
> > call#64 rule [ProjectToWindowRule:project]
> >
> >
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
> > no parent
> > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
> > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
> > no parent
> >
> > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
> > HepRelVertex#17
> > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
> > EnumerableWindow#18
> > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
> > HepRelVertex#19
> > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> > Breadth-first from root: {
> > HepRelVertex#19 =
> >
> rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
> > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0
> cpu,
> > 0.0 io}
> > HepRelVertex#17 =
> > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
> > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0,
> cumulative
> > cost={100.0 rows, 101.0 cpu, 0.0 io}
> >
>
>
>
Re: Understanding the Window Enumerable Process
Posted by Shawn Weeks <sw...@weeksconsulting.us>.
In my example I'm not setting a partition key as I'm trying to generate just a sequential list of integers. It sounds like everything is held in memory in a SortedMultiMap which explains the high memory usage though I'm not sure why it's using several times the memory as the input record set. My test file is a 2gb CSV and I'll hit out of memory with an '-Xmx16g' on the process.
My last question was to try and figure out how many times Calcite would have to iterate over the data to produce a result as when I give it large amounts of memory it does eventually finish it just takes substantially longer than the time to iterate through the data once. For example a simple select * might take 2-3 minutes for my test file but the row_number version takes 30+ minutes if it ever finishes.
Just in case I missed it, Calcite doesn't have a simple counter function like Oracles rownum function does it?
Thanks
Shawn
On 1/1/20, 9:55 PM, "Feng Zhu" <we...@gmail.com> wrote:
Hi, Shawn
I was hoping someone could give me a high level overview of how Calcite is
> executing the window function.
>
Calcite runtime generates Java code and complies the code with Janino. For
Window Enumerable Process,
there is a sample piece of code in
*org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
which may be what you need.
In general, input rows will be firstly organized into a*
org.apache.calcite.runtime.SortedMultiMap* according to partitioning key,
and then all of the windowed aggregate functions will be evaluated for each
list of rows that have the same partitioning key.
I would expect it to have to iterate through the data at least once for the
> window and once for the data itself.
>
Could you elaborate?
Best,
Feng
Shawn Weeks <sw...@weeksconsulting.us> 于2020年1月1日周三 上午1:23写道:
> Hi, I’m trying to troubleshoot an issue with the Apache NiFi projects use
> of Calcite to allow queries against flow files. NiFi presents an enumerable
> interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults. For
> queries with simple where clause and aggregations everything works great
> but when I use window analytic functions like row_number against larger
> sets of records(2gb) the processes uses all memory allocated and every cpu
> core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in live
> bytes and most of the CPU usage is NiFi parsing the records.
>
> I was hoping someone could give me a high level overview of how Calcite is
> executing the window function. I would expect it to have to iterate through
> the data at least once for the window and once for the data itself.
>
> Here is the query being executed and a trace of the Calcite execution plan.
>
> select "antiNucleus",
> "eventFile",
> "eventNumber",
> "eventTime",
> "histFile",
> "multiplicity",
> "NaboveLb",
> "NbelowLb",
> "NLb",
> "primaryTracks",
> "prodTime",
> "Pt",
> "runNumber",
> "vertexX",
> "vertexY",
> "vertexZ",
> row_number() over() id
> from flowfile;
>
> 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - Root:
> rel#10:Subset#1.ENUMERABLE.[]
> Original rel:
> LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]], antiNucleus=[$0],
> eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
> multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
> primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
> vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER (ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
> FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
> table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
> 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0
> io}, id = 0
>
> Sets:
> Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> JavaType(class java.lang.String) eventFile, JavaType(class
> java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
> JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
> multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
> java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> JavaType(class java.lang.String) primaryTracks, JavaType(class
> java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> JavaType(class java.lang.String) runNumber, JavaType(class
> java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> JavaType(class java.lang.String) vertexZ)
> rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
>
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
> cost={100.0 rows, 101.0 cpu, 0.0 io}
> Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> JavaType(class java.lang.String) eventFile, JavaType(class
> java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
> JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
> multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
> java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> JavaType(class java.lang.String) primaryTracks, JavaType(class
> java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> JavaType(class java.lang.String) runNumber, JavaType(class
> java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> JavaType(class java.lang.String) vertexZ, BIGINT id)
> rel#8:Subset#1.NONE.[], best=null, importance=0.9
>
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), rowcount=100.0,
> cumulative cost={inf}
>
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
> rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
>
> rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
> rowcount=100.0, cumulative cost={inf}
>
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
> 0.0 io}
>
>
> 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> EnumerableWindow#16
> 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> Cheapest plan:
> EnumerableWindow(window#0=[window(partition {} order by [] rows between
> UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount =
> 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
> FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7,
> 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost = {100.0
> rows, 101.0 cpu, 0.0 io}, id = 0
>
> 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> Provenance:
> EnumerableWindow#16
> direct
>
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()]))
> call#112 rule [EnumerableWindowRule]
>
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()]))
> call#64 rule [ProjectToWindowRule:project]
>
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
> no parent
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
> no parent
>
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> HepRelVertex#17
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> EnumerableWindow#18
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> HepRelVertex#19
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> Breadth-first from root: {
> HepRelVertex#19 =
> rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
> 0.0 io}
> HepRelVertex#17 =
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
> cost={100.0 rows, 101.0 cpu, 0.0 io}
>