You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@atlas.apache.org by Ashutosh Mestry <am...@hortonworks.com> on 2017/08/15 16:58:51 UTC

Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
-----------------------------------------------------------

Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Bugs: ATLAS-2047
    https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description
-------

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.

**Implementation**

Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
    info("Starting ")
    try{
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    shutdownLatch.countDown()
    info("Stopped ")
  }
```

Moved _try...catch_ block to leave exception handling to _ShutdownableThread_.


Diffs
-----

  webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
  webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/1/


Testing
-------

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Ashutosh Mestry <am...@hortonworks.com>.

> On Aug. 16, 2017, 9:59 a.m., Nixon Rodrigues wrote:
> > Ship It!

Can you please review? I have updated the patch with retry logic.


- Ashutosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review183030
-----------------------------------------------------------


On Aug. 15, 2017, 4:58 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 15, 2017, 4:58 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.
> 
> **Implementation**
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> Moved _try...catch_ block to leave exception handling to _ShutdownableThread_.
> 
> 
> Diffs
> -----
> 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/1/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Nixon Rodrigues <ni...@freestoneinfotech.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review183030
-----------------------------------------------------------


Ship it!




Ship It!

- Nixon Rodrigues


On Aug. 15, 2017, 4:58 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 15, 2017, 4:58 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.
> 
> **Implementation**
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> Moved _try...catch_ block to leave exception handling to _ShutdownableThread_.
> 
> 
> Diffs
> -----
> 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/1/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Madhan Neethiraj <ma...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review184102
-----------------------------------------------------------


Fix it, then Ship it!





webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 246 (patched)
<https://reviews.apache.org/r/61665/#comment260305>

    Instead of computing timeSinceLastWait here and passing to setWaitDurations(), I would suggest the following:
    
    public void pause(Exception ex) {
      setWaitDuration();
      
      try {
        ...
        Thread.sleep(waitDuration);
        ...
      } catch(...) {
        ...
      }
    }
    
    private void setWaitDuration() {
      long now               = System.currentTimeMillis();
      long timeSinceLastWait = now - lastWaitAt; // lastWaitAt will be 0 for the first time, which will result in "waitDuration = minDuration" in the followig if block
    
      lastWaitAt = now;
    
      if (timeSinceLastWait > resetInterval) {
        waitDuration = minDuration;
      } else if (waitDuration != maxDuration) {
        waitDuration += increment;
        
        if (waitDuration > maxDuration) {
          waitDuration = maxDuration;
        }
      }
    }



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 281 (patched)
<https://reviews.apache.org/r/61665/#comment260301>

    Use maxWaitDuration from line #127, instead of creating a new one here.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 283 (patched)
<https://reviews.apache.org/r/61665/#comment260302>

    500 ==> minWaitDuration


- Madhan Neethiraj


On Aug. 29, 2017, 10:37 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 29, 2017, 10:37 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.
> 
> 
> Diffs
> -----
> 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java PRE-CREATION 
>   webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/4/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Madhan Neethiraj <ma...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review184234
-----------------------------------------------------------


Ship it!




Ship It!

- Madhan Neethiraj


On Aug. 31, 2017, 4:23 a.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 31, 2017, 4:23 a.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.
> 
> 
> Diffs
> -----
> 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java PRE-CREATION 
>   webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/5/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Ashutosh Mestry <am...@hortonworks.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
-----------------------------------------------------------

(Updated Aug. 31, 2017, 4:23 a.m.)


Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Changes
-------

Updates include:
- Addressed review comments.
- Updated unit test.


Bugs: ATLAS-2047
    https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description
-------

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.

**Background**

The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.

Scenario may come about in this way:
- Shutdown has been initiated. Close on consumer is called.
- However, the consumer thread is just about to enter another poll cycle.
- Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).

Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
    info("Starting ")
    try{
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    shutdownLatch.countDown()
    info("Stopped ")
  }
```

**Implementation**

Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
- Modified _LOG_ to _debug_. That way logs are not filled during retry.
- _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.


Diffs (updated)
-----

  webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
  webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java PRE-CREATION 
  webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/5/

Changes: https://reviews.apache.org/r/61665/diff/4-5/


Testing
-------

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Ashutosh Mestry <am...@hortonworks.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
-----------------------------------------------------------

(Updated Aug. 29, 2017, 10:37 p.m.)


Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Changes
-------

Updates include:
- Addressed review comments.
- Updated unit tests.


Bugs: ATLAS-2047
    https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description
-------

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.

**Background**

The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.

Scenario may come about in this way:
- Shutdown has been initiated. Close on consumer is called.
- However, the consumer thread is just about to enter another poll cycle.
- Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).

Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
    info("Starting ")
    try{
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    shutdownLatch.countDown()
    info("Stopped ")
  }
```

**Implementation**

Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
- Modified _LOG_ to _debug_. That way logs are not filled during retry.
- _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.


Diffs (updated)
-----

  webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
  webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java PRE-CREATION 
  webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/4/

Changes: https://reviews.apache.org/r/61665/diff/3-4/


Testing
-------

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Madhan Neethiraj <ma...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review184093
-----------------------------------------------------------




webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 227 (patched)
<https://reviews.apache.org/r/61665/#comment260095>

    Consider marking 'resetInterval' as final.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 257 (patched)
<https://reviews.apache.org/r/61665/#comment260099>

    Consider renaming updateDurations() to setWaitDuration() and call from line #240.
    
    void setWaitDuration() {
      long now               = System.currentTimeMillis();
      long timeSinceLastWait = now - lastWaitAt;
    
      if (timeSinceLastWait > resetInterval) {
        waitDuration = minDuration; // reset
      } else if (waitDuration != maxDuration) {
        waitDuration += increment;
    
        if (waitDuration > maxDuration) {
          waitDuration = maxDuration;
        }
      }
      
      lastWaitAt = now;
    }



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 264 (patched)
<https://reviews.apache.org/r/61665/#comment260094>

    Shouldn't waitDuration be set to 'maxDuration' here? Else, the next wait will start from minDuration - causing more frequent retries.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 275 (patched)
<https://reviews.apache.org/r/61665/#comment260103>

    Consider reading minWaitDuration, maxWaitDuration from configuration, right next to where consumerRetryInterval is read currently (line #120).
    
    consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
    minWaitDuration       = applicationProperties.getInt(MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms  by default
    maxWaitDuration       = applicationProperties.getInt(MAX_RETRY_INTERVAL, minWaitDuration * 60);  //  30 sec by default



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 279 (patched)
<https://reviews.apache.org/r/61665/#comment260104>

    'pauseDuration' seems to be used only in tests; considering removing this member and update tests to use minWaitDuration.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Line 246 (original), 306 (patched)
<https://reviews.apache.org/r/61665/#comment260105>

    Why not call 'adaptiveWaiter.pause()' here as well?


- Madhan Neethiraj


On Aug. 29, 2017, 9:01 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 29, 2017, 9:01 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.
> 
> 
> Diffs
> -----
> 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java PRE-CREATION 
>   webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/3/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Ashutosh Mestry <am...@hortonworks.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
-----------------------------------------------------------

(Updated Aug. 29, 2017, 9:01 p.m.)


Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Changes
-------

Updates include:
- _AdaptiveWaiter_ implementation.
- Added unit tests for new implementation.


Bugs: ATLAS-2047
    https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description
-------

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.

**Background**

The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.

Scenario may come about in this way:
- Shutdown has been initiated. Close on consumer is called.
- However, the consumer thread is just about to enter another poll cycle.
- Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).

Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
    info("Starting ")
    try{
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    shutdownLatch.countDown()
    info("Stopped ")
  }
```

**Implementation**

Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
- Modified _LOG_ to _debug_. That way logs are not filled during retry.
- _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.


Diffs (updated)
-----

  webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
  webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java PRE-CREATION 
  webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/3/

Changes: https://reviews.apache.org/r/61665/diff/2-3/


Testing
-------

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Nixon Rodrigues <ni...@freestoneinfotech.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review183106
-----------------------------------------------------------




webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Line 245 (original), 253 (patched)
<https://reviews.apache.org/r/61665/#comment259112>

    Ashutosh,
    
    I think slidingWindowPause method should be added in other exception also. Since other exceptions will also add to flooding of logs.


- Nixon Rodrigues


On Aug. 17, 2017, 5:04 a.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 17, 2017, 5:04 a.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.
> 
> 
> Diffs
> -----
> 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/2/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Madhan Neethiraj <ma...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review183832
-----------------------------------------------------------




webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 274 (patched)
<https://reviews.apache.org/r/61665/#comment259905>

    I think it will be useful to abstract this functionality in a class of its own.
    
    class AdaptiveWaiter {
      public AdaptiveWaiter(long minWaitTime, long maxWaitTime, long waitIncrement) {
        resetInterval = maxWaitTime * 2;
      }
    
      public wait(String message) {
        long now               = System.currentTimeMillis();
        long timeSinceLastWait = now - lastWaitAt;
    
        if (timeSinceLastWait > resetInterval) { // it has been a long time since the last call
          waitTime = minWaitTime;
        } else {
          waitTime = timeSinceLastWait + waitIncrement;
    
          if (waitTime > maxWaitTime) {
            waitTime = maxWaitTime;
          }
        }
        
        lastWaitAt = now;
    
        LOG.debug(...);
    
        sleep(waitTime);
      }


- Madhan Neethiraj


On Aug. 17, 2017, 5:04 a.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 17, 2017, 5:04 a.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.
> 
> 
> Diffs
> -----
> 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
>   webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/2/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Ashutosh Mestry <am...@hortonworks.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
-----------------------------------------------------------

(Updated Aug. 17, 2017, 5:04 a.m.)


Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Changes
-------

Updates include:
- Additional analysis.
- Implementation details.


Bugs: ATLAS-2047
    https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description (updated)
-------

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.

**Background**

The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called at the beginning of almost every method in this class. The method checks if the consumer is closed, if it is then it throws IllegalStateException.

Scenario may come about in this way:
- Shutdown has been initiated. Close on consumer is called.
- However, the consumer thread is just about to enter another poll cycle.
- Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).

Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
    info("Starting ")
    try{
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    shutdownLatch.countDown()
    info("Stopped ")
  }
```

**Implementation**

Special treatment is given to _IllegalStateException_ by implementing pause & retry logic:
- Modified _LOG_ to _debug_. That way logs are not filled during retry.
- _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and entity APIs.


Diffs
-----

  webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
  webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/2/


Testing
-------

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry


Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Posted by Ashutosh Mestry <am...@hortonworks.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/
-----------------------------------------------------------

(Updated Aug. 16, 2017, 10:45 p.m.)


Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.


Changes
-------

Includes:
- Retry with _slidingWindow_ logic.
- Additional unit tests to verify retry logic.


Bugs: ATLAS-2047
    https://issues.apache.org/jira/browse/ATLAS-2047


Repository: atlas


Description
-------

Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background and analysis.

**Implementation**

Please take a look at this scala code. This is _ShutdownableThread_. The thread does the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s bug fix) and gets out of the _isRunning_ loop.
```scala
  override def run(): Unit = {
    info("Starting ")
    try{
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    shutdownLatch.countDown()
    info("Stopped ")
  }
```

Moved _try...catch_ block to leave exception handling to _ShutdownableThread_.


Diffs (updated)
-----

  webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b 
  webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java a6f58e8 


Diff: https://reviews.apache.org/r/61665/diff/2/

Changes: https://reviews.apache.org/r/61665/diff/1-2/


Testing
-------

**Unit tests**
Updated unit tests to reproduce the scenarios and verify the fix.

**Functional tests**
Verified regular notification scenarios.


Thanks,

Ashutosh Mestry