You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Akshay Iyangar (Jira)" <ji...@apache.org> on 2019/09/26 23:20:00 UTC

[jira] [Comment Edited] (BEAM-8212) StatefulParDoFn creates GC timers for every record

    [ https://issues.apache.org/jira/browse/BEAM-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937228#comment-16937228 ] 

Akshay Iyangar edited comment on BEAM-8212 at 9/26/19 11:19 PM:
----------------------------------------------------------------

 
{code:java}
public class TestDecodeTimer {
  @Test
  public void gctimerValue() throws IOException, ClassNotFoundException {

    StateNamespace stateNamespace = StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);

    String GC_TIMER_ID = "__StatefulParDoGcTimerId";
    //timerInternals.setTimer(
    //            StateNamespaces.window(windowCoder, window), GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);

    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
    StringUtf8Coder.of().encode(GC_TIMER_ID, outStream);
    StringUtf8Coder.of().encode(stateNamespace.stringKey(), outStream);

    System.out.println("The output stream is :"+ outStream.toString()); // __StatefulParDoGcTimerId//
    //We need to find what the hex value representation of this is
    String encode = BaseEncoding.base16().encode(outStream.toByteArray());
    System.out.println("The encoded string is " + encode); //185F5F537461746566756C506172446F476354696D65724964022F2F
    // We need everything after this as that is the gctimer and check what the value is for it also remove the eventime.
    
    ByteArrayOutputStream outStream1 = new ByteArrayOutputStream();
    StringUtf8Coder.of().encode(TimeDomain.EVENT_TIME.toString(), outStream1);
    String encode1 = BaseEncoding.base16().encode(outStream1.toByteArray());
    System.out.println("The encoded1 string is " + encode1); //0A4556454E545F54494D45
    System.out.println("Total Length of the encode key: "+ outStream.size());

    //Example key
    String decode = "008020C49BA0BCF7F901006A6176612E6E696F2E48656170427974654275666665F20100010C0107313831303639000C01000000185F5F537461746566756C506172446F476354696D65724964022F2F8020C49BA0BCF7F80A4556454E545F54494D45";

    //So the timer is whatever is between these two 185F5F537461746566756C506172446F476354696D65724964022F2F and 0A4556454E545F54494D45 viz 8020C49BA0BCF7F8
    Instant timeDecode = InstantCoder.of().decode(new ByteArrayInputStream(BaseEncoding.base16().decode(
        "8020C49BA0BCF7F8")));

    System.out.println("GC timer for Global Window is" +timeDecode); //294247-01-10T04:00:54.775Z 
    //This is nothing but +infinity and thus these timers would never be cleaned as the window never closes.
    
    //just cross verify
    System.out.println("MAX value" + BoundedWindow.TIMESTAMP_MAX_VALUE);
    System.out.println("MAX: "+GlobalWindow.TIMESTAMP_MAX_VALUE);
    
  }
}
{code}
So I just wrote a test to verify what the values are that are being generated for each of the events. just took one key from rocksdb to analyze and the timer is +Infinity or GlobalWindow.TIMESTAMP_MAX_VALUE which makes sense as it's a global window.

 

Also, I didn't see any keys associated with timers in the StatefulParDoFn .. 
{code:java}
rocksdb_ldb --db=db --column_family=_timer_state/event_beam-timer scan --max_keys=100 --key_hex
{code}
returned me zero keys. 

 

I ran a big pipeline to see the effect of having it disabled.

so at 1-hour mark with Global Window and rocksdb as the state backend,

the pipeline had consumed 432 million records with a memory usage of the node at roughly 50%. The node is 32GB EKS node where I gave 15GB to the JVM.

the same pipeline took 1 hr 30 mins to read 432 million records with the total node memory usage at 62%.

So I think it is fair to assume that for global windows the timers can affect the pipeline performance.

[~mxm] and [~NathanHowell] ^^

 

 

 

 

 


was (Author: aiyangar):
 
{code:java}
public class TestDecodeTimer {
  @Test
  public void gctimerValue() throws IOException, ClassNotFoundException {

    StateNamespace stateNamespace = StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);

    String GC_TIMER_ID = "__StatefulParDoGcTimerId";
    //timerInternals.setTimer(
    //            StateNamespaces.window(windowCoder, window), GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);

    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
    StringUtf8Coder.of().encode(GC_TIMER_ID, outStream);
    StringUtf8Coder.of().encode(stateNamespace.stringKey(), outStream);

    System.out.println("The output stream is :"+ outStream.toString()); // __StatefulParDoGcTimerId//
    //We need to find what the hex value representation of this is
    String encode = BaseEncoding.base16().encode(outStream.toByteArray());
    System.out.println("The encoded string is " + encode); //185F5F537461746566756C506172446F476354696D65724964022F2F
    // We need everything after this as that is the gctimer and check what the value is for it also remove the eventime.
    
    ByteArrayOutputStream outStream1 = new ByteArrayOutputStream();
    StringUtf8Coder.of().encode(TimeDomain.EVENT_TIME.toString(), outStream1);
    String encode1 = BaseEncoding.base16().encode(outStream1.toByteArray());
    System.out.println("The encoded1 string is " + encode1); //0A4556454E545F54494D45
    System.out.println("Total Length of the encode key: "+ outStream.size());

    //Example key
    String decode = "008020C49BA0BCF7F901006A6176612E6E696F2E48656170427974654275666665F20100010C0107313831303639000C01000000185F5F537461746566756C506172446F476354696D65724964022F2F8020C49BA0BCF7F80A4556454E545F54494D45";

    //So the timer is whatever is between these two 185F5F537461746566756C506172446F476354696D65724964022F2F and 0A4556454E545F54494D45 viz 8020C49BA0BCF7F8
    Instant timeDecode = InstantCoder.of().decode(new ByteArrayInputStream(BaseEncoding.base16().decode(
        "8020C49BA0BCF7F8")));

    System.out.println("GC timer for Global Window is" +timeDecode); //294247-01-10T04:00:54.775Z 
    //This is nothing but +infinity and thus these timers would never be cleaned as the window never closes.
    
    //just cross verify
    System.out.println("MAX value" + BoundedWindow.TIMESTAMP_MAX_VALUE);
    System.out.println("MAX: "+GlobalWindow.TIMESTAMP_MAX_VALUE);
    
  }
}
{code}
So I just wrote a test to verify what the values are that are being generated for each of the events. just took one key from rocksdb to analyze and the timer is +Infinity or GlobalWindow.TIMESTAMP_MAX_VALUE which makes sense as it's a global window.

 

I also went ahead and disabled the timers for global windows to do some benchmarking and found that now rocksdb doesn't generate any state for WindowDoFnOperator something that was previously generated as below.
{code:java}
/rocksdb/job_00000000000000000000000000000000_op_WindowDoFnOperator_e2c1f521beded61187c1d16f3c146358__3_3__uuid_4e0e102b-ffcd-4111-80f7-b9a8f318d04a/db
{code}
 

Also, I didn't see any keys associated with timers in the StatefulParDoFn .. 
{code:java}
rocksdb_ldb --db=db --column_family=_timer_state/event_beam-timer scan --max_keys=100 --key_hex
{code}
returned me zero keys. 

 

I'm running the pipeline to get the exact benchmarks and will keep you updated but one thing right of the bat is that we see fewer state operators that mean rocksdb will have more memory to play with as it has operator * parallelism # of states less as compared to the previous run. w.rt to the speed and throughput of the pipeline will update shortly.

 

 

 

 

> StatefulParDoFn creates GC timers for every record 
> ---------------------------------------------------
>
>                 Key: BEAM-8212
>                 URL: https://issues.apache.org/jira/browse/BEAM-8212
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Akshay Iyangar
>            Priority: Major
>
> Hi 
> So currently the StatefulParDoFn create timers for all the records.
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L211]
> This becomes a problem if you are using GlobalWindows for streaming where these timers get created and never get closed since the window will never close.
> This is a problem especially if your memory bound in rocksDB where these timers take up potential space and sloe the pipelines considerably.
> Was wondering that if the pipeline runs in global windows we should avoid adding timers to it at all?
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)