You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by zd-project <gi...@git.apache.org> on 2018/06/28 20:10:15 UTC

[GitHub] storm issue #2743: [STORM-3130]: Add Wrappers for Timer registration and tim...

Github user zd-project commented on the issue:

    https://github.com/apache/storm/pull/2743
  
    Explanation on abstraction and usage.
    
    ```java
    //Interface that provides #stopTiming and #hasStopped helper method
    public interface TimerDecorated extends AutoCloseable {}
    
    //Subclass of LocalAssignment
    public class TimerDecoratedAssignment extends LocalAssignment implements TimerDecorated {}
    
    //Base implementation of Timed decorators
    public class Timed<T> implements TimerDecorated {}
    public class TimedResource<T extends AutoCloseable> extends Timed<T> {}
    public class TimedWritableByteChannel extends TimedResource<WritableByteChannel> implements WritableByteChannel {}
    ```
    
    - Tracking time on the topology jar upload to Nimbus
    	- Originally, Nimbus gets notified of the upload through callback `beginFileUpload()`, `uploadChunk()`, and `finishFileUpload()`, all of which depends on a map from `Path` to `WritableByteChannel`. `WritableByteChannel` is the actual object that handles data I/O, so by implementing `WritableByteChannel` with a decorator `TimedWritableByteChannel` , we're able to track the timing with only one line change in `Nimbus.java` 
    
    
    ```java
    public String beginFileUpload() throws AuthorizationException, TException {
            try {
    	        ...
                String fileloc = ...;
    -	        uploaders.put(fileloc, Channels.newChannel(new FileOutputStream(fileloc)));            
    +           uploaders.put(fileloc, new TimedWritableByteChannel(Channels.newChannel(new FileOutputStream(fileloc)), fileUploadDuration));
                return fileloc;
            } catch (Exception e) {
    			...;
            }
        }
    ```
    
    - Tracking time for a worker to launch an assignment
    	- A worker slot receives assignment asynchronously and execute based on a State Machine model. We can attach the timer to the assignment itself to minimize the code refactoring. However, because `LocalAssignment` itself is generated from Thrift protocol and does not have a complete interface, it's not ideal to use a decorator. Hence I choose to inherit from `LocalAssignment` instead and declare `TimerDecoratedAssignment` class that implements `TimerDecorated` directly. This way it's less obtrusive to the existing code.


---