You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Feng Zhu <we...@gmail.com> on 2020/01/02 03:55:06 UTC

Re: Understanding the Window Enumerable Process

Hi, Shawn

I was hoping someone could give me a high level overview of how Calcite is
> executing the window function.
>
Calcite runtime generates Java code and complies the code with Janino. For
Window Enumerable Process,
there is a sample piece of code in
*org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
which may be what you need.

In general, input rows will be firstly organized into a*
org.apache.calcite.runtime.SortedMultiMap* according to partitioning key,
and then all of the windowed aggregate functions will be evaluated for each
list of rows that have the same partitioning key.

I would expect it to have to iterate through the data at least once for the
> window and once for the data itself.
>
Could you elaborate?

Best,
Feng

Shawn Weeks <sw...@weeksconsulting.us> 于2020年1月1日周三 上午1:23写道:

> Hi, I’m trying to troubleshoot an issue with the Apache NiFi projects use
> of Calcite to allow queries against flow files. NiFi presents an enumerable
> interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults. For
> queries with simple where clause and aggregations everything works great
> but when I use window analytic functions like row_number against larger
> sets of records(2gb) the processes uses all memory allocated and every cpu
> core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in live
> bytes and most of the CPU usage is NiFi parsing the records.
>
> I was hoping someone could give me a high level overview of how Calcite is
> executing the window function. I would expect it to have to iterate through
> the data at least once for the window and once for the data itself.
>
> Here is the query being executed and a trace of the Calcite execution plan.
>
> select  "antiNucleus",
>         "eventFile",
>         "eventNumber",
>         "eventTime",
>         "histFile",
>         "multiplicity",
>         "NaboveLb",
>         "NbelowLb",
>         "NLb",
>         "primaryTracks",
>         "prodTime",
>         "Pt",
>         "runNumber",
>         "vertexX",
>         "vertexY",
>         "vertexZ",
>         row_number() over() id
>     from flowfile;
>
> 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - Root:
> rel#10:Subset#1.ENUMERABLE.[]
> Original rel:
> LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]], antiNucleus=[$0],
> eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
> multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
> primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
> vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER (ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
>   FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
> table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
> 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0
> io}, id = 0
>
> Sets:
> Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> JavaType(class java.lang.String) eventFile, JavaType(class
> java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
> JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
> multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
> java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> JavaType(class java.lang.String) primaryTracks, JavaType(class
> java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> JavaType(class java.lang.String) runNumber, JavaType(class
> java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> JavaType(class java.lang.String) vertexZ)
>         rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
>
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
> cost={100.0 rows, 101.0 cpu, 0.0 io}
> Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
> JavaType(class java.lang.String) eventFile, JavaType(class
> java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
> JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
> multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
> java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
> JavaType(class java.lang.String) primaryTracks, JavaType(class
> java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
> JavaType(class java.lang.String) runNumber, JavaType(class
> java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
> JavaType(class java.lang.String) vertexZ, BIGINT id)
>         rel#8:Subset#1.NONE.[], best=null, importance=0.9
>
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), rowcount=100.0,
> cumulative cost={inf}
>
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
>         rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
>
> rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
> rowcount=100.0, cumulative cost={inf}
>
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
> 0.0 io}
>
>
> 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> EnumerableWindow#16
> 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> Cheapest plan:
> EnumerableWindow(window#0=[window(partition {} order by [] rows between
> UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount =
> 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
>   FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7,
> 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost = {100.0
> rows, 101.0 cpu, 0.0 io}, id = 0
>
> 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
> Provenance:
> EnumerableWindow#16
>   direct
>
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()]))
>       call#112 rule [EnumerableWindowRule]
>
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()]))
>           call#64 rule [ProjectToWindowRule:project]
>
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
> OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
>               no parent
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
>   no parent
>
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> HepRelVertex#17
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> EnumerableWindow#18
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
> HepRelVertex#19
> 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> Breadth-first from root:  {
>     HepRelVertex#19 =
> rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
> {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
> [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
> 0.0 io}
>     HepRelVertex#17 =
> rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
> cost={100.0 rows, 101.0 cpu, 0.0 io}
>

Re: Understanding the Window Enumerable Process

Posted by Feng Zhu <we...@gmail.com>.
The memory issue may be related with the implementation.
For row_number() function, the runtime processing logic is demonstrated as
below.
It can be seen that too many data structures (i.e., list/array/map) will be
created.

Best,
Feng


=====================================================================================================
  *// 1.Loading data set into a List*
  final java.util.List tempList = (java.util.List)
org.apache.calcite.linq4j.Linq4j.asEnumerable(***your data set***).into(new
java.util.ArrayList());

  *// 2.Sorting the data set (Note: a new map&Array&Iterator will be
constructed, see SortedMultiMap)*
  final java.util.Comparator comparator = new java.util.Comparator(){...};
  final java.util.Iterator iterator =
org.apache.calcite.runtime.SortedMultiMap.singletonArrayIterator(comparator,
tempList);

  *// 3.Constructing a new ArrayList for result*
  final java.util.ArrayList _list = new
java.util.ArrayList(tempList.size());

  *// 4. Computing row_number()*
  Long a0w0 = (Long) null; *// for row_number*
  while (iterator.hasNext()) {
    final Object[] _rows = (Object[]) iterator.next();
    for (int i = 0; i < _rows.length; ++i) {
      final org.apache.calcite.test.JdbcTest.Employee row =
(org.apache.calcite.test.JdbcTest.Employee) _rows[i];
      ......
      _list.add(new Object[] {
        ...... // row columns
        a0w0});
    }
  }

  tempList.clear();

  final org.apache.calcite.linq4j.Enumerable _inputEnumerable =
org.apache.calcite.linq4j.Linq4j.asEnumerable(_list);

  return new org.apache.calcite.linq4j.AbstractEnumerable(){
      public org.apache.calcite.linq4j.Enumerator enumerator() {
        return new org.apache.calcite.linq4j.Enumerator(){
            public final org.apache.calcite.linq4j.Enumerator
inputEnumerator = _inputEnumerable.enumerator();
            ......
            public Object current() {
              final Object[] current = (Object[]) inputEnumerator.current();
              return new Object[] { ... // row columns
                };
            }
          };
      }
    };

Shawn Weeks <sw...@weeksconsulting.us> 于2020年1月3日周五 上午12:16写道:

> In my example I'm not setting a partition key as I'm trying to generate
> just a sequential list of integers. It sounds like everything is held in
> memory in a SortedMultiMap which explains the high memory usage though I'm
> not sure why it's using several times the memory as the input record set.
> My test file is a 2gb CSV and I'll hit out of memory with an '-Xmx16g' on
> the process.
>
> My last question was to try and figure out how many times Calcite would
> have to iterate over the data to produce a result as when I give it large
> amounts of memory it does eventually finish it just takes substantially
> longer than the time to iterate through the data once. For example a simple
> select * might take 2-3 minutes for my test file but the row_number version
> takes 30+ minutes if it ever finishes.
>
> Just in case I missed it, Calcite doesn't have a simple counter function
> like Oracles rownum function does it?
>
> Thanks
> Shawn
>
> On 1/1/20, 9:55 PM, "Feng Zhu" <we...@gmail.com> wrote:
>
>     Hi, Shawn
>
>     I was hoping someone could give me a high level overview of how
> Calcite is
>     > executing the window function.
>     >
>     Calcite runtime generates Java code and complies the code with Janino.
> For
>     Window Enumerable Process,
>     there is a sample piece of code in
>
> *org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
>     which may be what you need.
>
>     In general, input rows will be firstly organized into a*
>     org.apache.calcite.runtime.SortedMultiMap* according to partitioning
> key,
>     and then all of the windowed aggregate functions will be evaluated for
> each
>     list of rows that have the same partitioning key.
>
>     I would expect it to have to iterate through the data at least once
> for the
>     > window and once for the data itself.
>     >
>     Could you elaborate?
>
>     Best,
>     Feng
>
>     Shawn Weeks <sw...@weeksconsulting.us> 于2020年1月1日周三 上午1:23写道:
>
>     > Hi, I’m trying to troubleshoot an issue with the Apache NiFi
> projects use
>     > of Calcite to allow queries against flow files. NiFi presents an
> enumerable
>     > interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults.
> For
>     > queries with simple where clause and aggregations everything works
> great
>     > but when I use window analytic functions like row_number against
> larger
>     > sets of records(2gb) the processes uses all memory allocated and
> every cpu
>     > core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in
> live
>     > bytes and most of the CPU usage is NiFi parsing the records.
>     >
>     > I was hoping someone could give me a high level overview of how
> Calcite is
>     > executing the window function. I would expect it to have to iterate
> through
>     > the data at least once for the window and once for the data itself.
>     >
>     > Here is the query being executed and a trace of the Calcite
> execution plan.
>     >
>     > select  "antiNucleus",
>     >         "eventFile",
>     >         "eventNumber",
>     >         "eventTime",
>     >         "histFile",
>     >         "multiplicity",
>     >         "NaboveLb",
>     >         "NbelowLb",
>     >         "NLb",
>     >         "primaryTracks",
>     >         "prodTime",
>     >         "Pt",
>     >         "runNumber",
>     >         "vertexX",
>     >         "vertexY",
>     >         "vertexZ",
>     >         row_number() over() id
>     >     from flowfile;
>     >
>     > 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> Root:
>     > rel#10:Subset#1.ENUMERABLE.[]
>     > Original rel:
>     > LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]],
> antiNucleus=[$0],
>     > eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
>     > multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
>     > primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
>     > vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER
> (ROWS
>     > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
>     > cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
>     >   FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
>     > table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
> 12, 13,
>     > 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0
> cpu, 0.0
>     > io}, id = 0
>     >
>     > Sets:
>     > Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
>     > JavaType(class java.lang.String) eventFile, JavaType(class
>     > java.lang.String) eventNumber, JavaType(class java.lang.String)
> eventTime,
>     > JavaType(class java.lang.String) histFile, JavaType(class
> java.lang.String)
>     > multiplicity, JavaType(class java.lang.String) NaboveLb,
> JavaType(class
>     > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
>     > JavaType(class java.lang.String) primaryTracks, JavaType(class
>     > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
>     > JavaType(class java.lang.String) runNumber, JavaType(class
>     > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
>     > JavaType(class java.lang.String) vertexZ)
>     >         rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
>     >
>     > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
>     > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0,
> cumulative
>     > cost={100.0 rows, 101.0 cpu, 0.0 io}
>     > Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
>     > JavaType(class java.lang.String) eventFile, JavaType(class
>     > java.lang.String) eventNumber, JavaType(class java.lang.String)
> eventTime,
>     > JavaType(class java.lang.String) histFile, JavaType(class
> java.lang.String)
>     > multiplicity, JavaType(class java.lang.String) NaboveLb,
> JavaType(class
>     > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
>     > JavaType(class java.lang.String) primaryTracks, JavaType(class
>     > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
>     > JavaType(class java.lang.String) runNumber, JavaType(class
>     > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
>     > JavaType(class java.lang.String) vertexZ, BIGINT id)
>     >         rel#8:Subset#1.NONE.[], best=null, importance=0.9
>     >
>     >
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
>     > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)),
> rowcount=100.0,
>     > cumulative cost={inf}
>     >
>     >
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
>     >         rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
>     >
>     >
> rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
>     > rowcount=100.0, cumulative cost={inf}
>     >
>     >
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0
> cpu,
>     > 0.0 io}
>     >
>     >
>     > 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
>     > EnumerableWindow#16
>     > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
>     > Cheapest plan:
>     > EnumerableWindow(window#0=[window(partition {} order by [] rows
> between
>     > UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount
> =
>     > 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
>     >   FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5,
> 6, 7,
>     > 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost =
> {100.0
>     > rows, 101.0 cpu, 0.0 io}, id = 0
>     >
>     > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
>     > Provenance:
>     > EnumerableWindow#16
>     >   direct
>     >
>     >
> rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()]))
>     >       call#112 rule [EnumerableWindowRule]
>     >
>     >
> rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()]))
>     >           call#64 rule [ProjectToWindowRule:project]
>     >
>     >
> rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
>     > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
>     >               no parent
>     > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
>     > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
>     >   no parent
>     >
>     > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
>     > HepRelVertex#17
>     > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
>     > EnumerableWindow#18
>     > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
> new
>     > HepRelVertex#19
>     > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
>     > Breadth-first from root:  {
>     >     HepRelVertex#19 =
>     >
> rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
>     > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
>     > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0
> cpu,
>     > 0.0 io}
>     >     HepRelVertex#17 =
>     > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1,
> 2, 3,
>     > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0,
> cumulative
>     > cost={100.0 rows, 101.0 cpu, 0.0 io}
>     >
>
>
>

Re: Understanding the Window Enumerable Process

Posted by Shawn Weeks <sw...@weeksconsulting.us>.
In my example I'm not setting a partition key as I'm trying to generate just a sequential list of integers. It sounds like everything is held in memory in a SortedMultiMap which explains the high memory usage though I'm not sure why it's using several times the memory as the input record set. My test file is a 2gb CSV and I'll hit out of memory with an '-Xmx16g' on the process.

My last question was to try and figure out how many times Calcite would have to iterate over the data to produce a result as when I give it large amounts of memory it does eventually finish it just takes substantially longer than the time to iterate through the data once. For example a simple select * might take 2-3 minutes for my test file but the row_number version takes 30+ minutes if it ever finishes.

Just in case I missed it, Calcite doesn't have a simple counter function like Oracles rownum function does it?

Thanks
Shawn

On 1/1/20, 9:55 PM, "Feng Zhu" <we...@gmail.com> wrote:

    Hi, Shawn
    
    I was hoping someone could give me a high level overview of how Calcite is
    > executing the window function.
    >
    Calcite runtime generates Java code and complies the code with Janino. For
    Window Enumerable Process,
    there is a sample piece of code in
    *org.apache.calcite.adapter.enumerable.EnumerableWindow#sampleOfTheGeneratedWindowedAggregate()*,
    which may be what you need.
    
    In general, input rows will be firstly organized into a*
    org.apache.calcite.runtime.SortedMultiMap* according to partitioning key,
    and then all of the windowed aggregate functions will be evaluated for each
    list of rows that have the same partitioning key.
    
    I would expect it to have to iterate through the data at least once for the
    > window and once for the data itself.
    >
    Could you elaborate?
    
    Best,
    Feng
    
    Shawn Weeks <sw...@weeksconsulting.us> 于2020年1月1日周三 上午1:23写道:
    
    > Hi, I’m trying to troubleshoot an issue with the Apache NiFi projects use
    > of Calcite to allow queries against flow files. NiFi presents an enumerable
    > interface to Calcite org.apache.calcite.linq4j.EnumerableDefaults. For
    > queries with simple where clause and aggregations everything works great
    > but when I use window analytic functions like row_number against larger
    > sets of records(2gb) the processes uses all memory allocated and every cpu
    > core. I’ve attached a profiler and I’m only ever seeing 2-3 MB in live
    > bytes and most of the CPU usage is NiFi parsing the records.
    >
    > I was hoping someone could give me a high level overview of how Calcite is
    > executing the window function. I would expect it to have to iterate through
    > the data at least once for the window and once for the data itself.
    >
    > Here is the query being executed and a trace of the Calcite execution plan.
    >
    > select  "antiNucleus",
    >         "eventFile",
    >         "eventNumber",
    >         "eventTime",
    >         "histFile",
    >         "multiplicity",
    >         "NaboveLb",
    >         "NbelowLb",
    >         "NLb",
    >         "primaryTracks",
    >         "prodTime",
    >         "Pt",
    >         "runNumber",
    >         "vertexX",
    >         "vertexY",
    >         "vertexZ",
    >         row_number() over() id
    >     from flowfile;
    >
    > 7199 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - Root:
    > rel#10:Subset#1.ENUMERABLE.[]
    > Original rel:
    > LogicalProject(subset=[rel#10:Subset#1.ENUMERABLE.[]], antiNucleus=[$0],
    > eventFile=[$1], eventNumber=[$2], eventTime=[$3], histFile=[$4],
    > multiplicity=[$5], NaboveLb=[$6], NbelowLb=[$7], NLb=[$8],
    > primaryTracks=[$9], prodTime=[$10], Pt=[$11], runNumber=[$12],
    > vertexX=[$13], vertexY=[$14], vertexZ=[$15], id=[ROW_NUMBER() OVER (ROWS
    > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)]): rowcount = 100.0,
    > cumulative cost = {100.0 rows, 1700.0 cpu, 0.0 io}, id = 7
    >   FlowFileTableScan(subset=[rel#6:Subset#0.ENUMERABLE.[]],
    > table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,
    > 14, 15]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0
    > io}, id = 0
    >
    > Sets:
    > Set#0, type: RecordType(JavaType(class java.lang.String) antiNucleus,
    > JavaType(class java.lang.String) eventFile, JavaType(class
    > java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
    > JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
    > multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
    > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
    > JavaType(class java.lang.String) primaryTracks, JavaType(class
    > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
    > JavaType(class java.lang.String) runNumber, JavaType(class
    > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
    > JavaType(class java.lang.String) vertexZ)
    >         rel#6:Subset#0.ENUMERABLE.[], best=rel#0, importance=0.81
    >
    > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
    > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
    > cost={100.0 rows, 101.0 cpu, 0.0 io}
    > Set#1, type: RecordType(JavaType(class java.lang.String) antiNucleus,
    > JavaType(class java.lang.String) eventFile, JavaType(class
    > java.lang.String) eventNumber, JavaType(class java.lang.String) eventTime,
    > JavaType(class java.lang.String) histFile, JavaType(class java.lang.String)
    > multiplicity, JavaType(class java.lang.String) NaboveLb, JavaType(class
    > java.lang.String) NbelowLb, JavaType(class java.lang.String) NLb,
    > JavaType(class java.lang.String) primaryTracks, JavaType(class
    > java.lang.String) prodTime, JavaType(class java.lang.String) Pt,
    > JavaType(class java.lang.String) runNumber, JavaType(class
    > java.lang.String) vertexX, JavaType(class java.lang.String) vertexY,
    > JavaType(class java.lang.String) vertexZ, BIGINT id)
    >         rel#8:Subset#1.NONE.[], best=null, importance=0.9
    >
    > rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
    > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)), rowcount=100.0,
    > cumulative cost={inf}
    >
    > rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={inf}
    >         rel#10:Subset#1.ENUMERABLE.[], best=rel#15, importance=1.0
    >
    > rel#11:AbstractConverter.ENUMERABLE.[](input=rel#8:Subset#1.NONE.[],convention=ENUMERABLE,sort=[]),
    > rowcount=100.0, cumulative cost={inf}
    >
    > rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
    > 0.0 io}
    >
    >
    > 7200 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
    > EnumerableWindow#16
    > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
    > Cheapest plan:
    > EnumerableWindow(window#0=[window(partition {} order by [] rows between
    > UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]): rowcount =
    > 100.0, cumulative cost = {200.0 rows, 301.0 cpu, 0.0 io}, id = 16
    >   FlowFileTableScan(table=[[FLOWFILE]], fields=[[0, 1, 2, 3, 4, 5, 6, 7,
    > 8, 9, 10, 11, 12, 13, 14, 15]]): rowcount = 100.0, cumulative cost = {100.0
    > rows, 101.0 cpu, 0.0 io}, id = 0
    >
    > 7215 [pool-1-thread-1] DEBUG org.apache.calcite.plan.RelOptPlanner -
    > Provenance:
    > EnumerableWindow#16
    >   direct
    >
    > rel#15:EnumerableWindow.ENUMERABLE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()]))
    >       call#112 rule [EnumerableWindowRule]
    >
    > rel#13:LogicalWindow.NONE.[[]](input=rel#6:Subset#0.ENUMERABLE.[],window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()]))
    >           call#64 rule [ProjectToWindowRule:project]
    >
    > rel#7:LogicalProject.NONE.[](input=rel#6:Subset#0.ENUMERABLE.[],antiNucleus=$0,eventFile=$1,eventNumber=$2,eventTime=$3,histFile=$4,multiplicity=$5,NaboveLb=$6,NbelowLb=$7,NLb=$8,primaryTracks=$9,prodTime=$10,Pt=$11,runNumber=$12,vertexX=$13,vertexY=$14,vertexZ=$15,id=ROW_NUMBER()
    > OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))
    >               no parent
    > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
    > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
    >   no parent
    >
    > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
    > HepRelVertex#17
    > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
    > EnumerableWindow#18
    > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner - new
    > HepRelVertex#19
    > 7215 [pool-1-thread-1] TRACE org.apache.calcite.plan.RelOptPlanner -
    > Breadth-first from root:  {
    >     HepRelVertex#19 =
    > rel#18:EnumerableWindow.ENUMERABLE.[[]](input=HepRelVertex#17,window#0=window(partition
    > {} order by [] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs
    > [ROW_NUMBER()])), rowcount=100.0, cumulative cost={200.0 rows, 301.0 cpu,
    > 0.0 io}
    >     HepRelVertex#17 =
    > rel#0:FlowFileTableScan.ENUMERABLE.[](table=[FLOWFILE],fields=[0, 1, 2, 3,
    > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]), rowcount=100.0, cumulative
    > cost={100.0 rows, 101.0 cpu, 0.0 io}
    >