You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2017/08/31 05:21:22 UTC

atlas git commit: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling

Repository: atlas
Updated Branches:
  refs/heads/master ef300f15a -> b837c0ee3


ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/b837c0ee
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/b837c0ee
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/b837c0ee

Branch: refs/heads/master
Commit: b837c0ee304174a100fc2f4507cbdd7e9a4195b9
Parents: ef300f1
Author: ashutoshm <am...@hortonworks.com>
Authored: Wed Aug 30 22:20:17 2017 -0700
Committer: ashutoshm <am...@hortonworks.com>
Committed: Wed Aug 30 22:20:17 2017 -0700

----------------------------------------------------------------------
 .../notification/NotificationHookConsumer.java  | 74 ++++++++++++++++++--
 .../atlas/notification/AdaptiveWaiterTest.java  | 68 ++++++++++++++++++
 .../NotificationHookConsumerTest.java           | 51 ++++++++++++++
 3 files changed, 188 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/b837c0ee/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index ef64c3b..858b320 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -84,6 +84,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
     public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
     public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
+    public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
+    public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
+
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
     private final AtlasEntityStore atlasEntityStore;
@@ -92,7 +95,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     private final AtlasTypeRegistry typeRegistry;
     private final int maxRetries;
     private final int failedMsgCacheSize;
-    private final int consumerRetryInterval;
+
+    @VisibleForTesting
+    final int consumerRetryInterval;
+    private final int minWaitDuration;
+    private final int maxWaitDuration;
 
     private NotificationInterface notificationInterface;
     private ExecutorService executors;
@@ -116,7 +123,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
         failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
         consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
-
+        minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms  by default
+        maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60);  //  30 sec by default
     }
 
     @Override
@@ -214,12 +222,64 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
     }
 
+    static class AdaptiveWaiter {
+        private final long increment;
+        private final long maxDuration;
+        private final long minDuration;
+        private final long resetInterval;
+
+        private long lastWaitAt;
+        @VisibleForTesting
+        long waitDuration;
+
+        public AdaptiveWaiter(long minDuration, long maxDuration, long increment) {
+            this.minDuration = minDuration;
+            this.maxDuration = maxDuration;
+            this.increment = increment;
+
+            this.waitDuration = minDuration;
+            this.lastWaitAt = 0;
+            this.resetInterval = maxDuration * 2;
+        }
+
+        public void pause(Exception ex) {
+            setWaitDurations();
+
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("{} in NotificationHookConsumer. Waiting for {} ms for recovery.", ex.getClass().getName(), waitDuration, ex);
+                }
+
+                Thread.sleep(waitDuration);
+            } catch (InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("{} in NotificationHookConsumer. Waiting for recovery interrupted.", ex.getClass().getName(), e);
+                }
+            }
+        }
+
+        private void setWaitDurations() {
+            long timeSinceLastWait = (lastWaitAt == 0) ? 0 : System.currentTimeMillis() - lastWaitAt;
+            lastWaitAt = System.currentTimeMillis();
+            if (timeSinceLastWait > resetInterval) {
+                waitDuration = minDuration;
+            } else {
+                waitDuration += increment;
+                if (waitDuration > maxDuration) {
+                    waitDuration = maxDuration;
+                }
+            }
+        }
+    }
+
     @VisibleForTesting
     class HookConsumer extends ShutdownableThread {
         private final NotificationConsumer<HookNotificationMessage> consumer;
         private final AtomicBoolean shouldRun = new AtomicBoolean(false);
         private List<HookNotificationMessage> failedMessages = new ArrayList<>();
 
+        private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
+
         public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) {
             super("atlas-hook-consumer-thread", false);
             this.consumer = consumer;
@@ -242,16 +302,20 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                         for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) {
                             handleMessage(msg);
                         }
+                    } catch (IllegalStateException ex) {
+                        adaptiveWaiter.pause(ex);
                     } catch (Exception e) {
                         if (shouldRun.get()) {
                             LOG.warn("Exception in NotificationHookConsumer", e);
+                            adaptiveWaiter.pause(e);
+                        } else {
+                            break;
                         }
                     }
                 }
             } finally {
                 if (consumer != null) {
                     LOG.info("closing NotificationConsumer");
-
                     consumer.close();
                 }
 
@@ -424,7 +488,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
             // handle the case where thread was not started at all
             // and shutdown called
-            if(shouldRun.get() == false) {
+            if (shouldRun.get() == false) {
                 return;
             }
 
@@ -433,8 +497,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             if (consumer != null) {
                 consumer.wakeup();
             }
-            super.awaitShutdown();
 
+            super.awaitShutdown();
             LOG.info("<== HookConsumer shutdown()");
         }
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/b837c0ee/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java b/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java
new file mode 100644
index 0000000..3b4ba02
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification;
+
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class AdaptiveWaiterTest {
+
+    private final int maxDuration = 100;
+    private final int minDuration = 5;
+    private final int increment = 5;
+    private NotificationHookConsumer.AdaptiveWaiter waiter;
+
+    @BeforeClass
+    public void setup() {
+        waiter = new NotificationHookConsumer.AdaptiveWaiter(minDuration, maxDuration, increment);
+    }
+
+    @Test
+    public void basicTest() {
+        for (int i = 0; i < 20; i++) {
+            waiter.pause(new IllegalStateException());
+        }
+
+        assertEquals(waiter.waitDuration, 95);
+    }
+
+    @Test
+    public void resetTest() {
+        final int someHighAttemptNumber = 30;
+        for (int i = 0; i < someHighAttemptNumber; i++) {
+            waiter.pause(new IllegalStateException());
+        }
+
+        assertEquals(waiter.waitDuration, maxDuration);
+    }
+
+    @Test
+    public void longPauseResets() {
+        waiter.pause(new IllegalStateException());
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        waiter.pause(new IllegalArgumentException());
+        assertEquals(waiter.waitDuration, 5);
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/b837c0ee/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index a6f58e8..2b4aa4f 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -251,4 +251,55 @@ public class NotificationHookConsumerTest {
         verify(notificationInterface).close();
         verify(executorService).shutdown();
     }
+
+    @Test
+    public void consumersThrowsIllegalStateExceptionThreadUsesPauseRetryLogic() throws Exception {
+        final NotificationHookConsumer notificationHookConsumer = setupNotificationHookConsumer();
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                notificationHookConsumer.consumers.get(0).start();
+                Thread.sleep(1000);
+                return null;
+            }
+        }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+
+        notificationHookConsumer.startInternal(configuration, executorService);
+        Thread.sleep(1000);
+        assertTrue(notificationHookConsumer.consumers.get(0).isAlive());
+        notificationHookConsumer.consumers.get(0).shutdown();
+    }
+
+    @Test
+    public void consumersThrowsIllegalStateExceptionPauseRetryLogicIsInterrupted() throws Exception {
+        final NotificationHookConsumer notificationHookConsumer = setupNotificationHookConsumer();
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                notificationHookConsumer.consumers.get(0).start();
+                Thread.sleep(1000);
+                return null;
+            }
+        }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+
+        notificationHookConsumer.startInternal(configuration, executorService);
+        Thread.sleep(1000);
+        notificationHookConsumer.consumers.get(0).shutdown();
+        assertFalse(notificationHookConsumer.consumers.get(0).isAlive());
+    }
+
+    private NotificationHookConsumer setupNotificationHookConsumer() throws AtlasException {
+        when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
+        when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true);
+        when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class);
+        when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException());
+        consumers.add(notificationConsumerMock);
+
+        when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers);
+        return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry);
+    }
 }