You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2020/10/02 01:07:45 UTC

[asterixdb] 09/11: [NO ISSUE][*DB][ACT] Active stats synchronization

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit d2aefdccbac0ca4dc2ad50f45480c6c57f94a50e
Author: Michael Blow <mi...@couchbase.com>
AuthorDate: Wed Sep 30 20:12:36 2020 -0400

    [NO ISSUE][*DB][ACT] Active stats synchronization
    
    Avoid locks on stats refresh requests on non-running active entities
    
    Change-Id: I458f15cd4b199576b3236762c6da904f086147fd
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8185
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Tested-by: Michael Blow <mb...@apache.org>
---
 .../active/IActiveEntityEventSubscriber.java       |  2 +-
 .../active/IActiveEntityEventsListener.java        |  2 +-
 .../active/message/ActiveManagerMessage.java       |  2 +-
 .../active/message/ActiveStatsRequestMessage.java  | 10 ++++++++
 .../app/active/ActiveEntityEventsListener.java     | 25 ++++++++++++++-----
 .../asterix/test/active/ActiveStatsTest.java       | 19 +++++++++++---
 .../asterix/common/exceptions/ErrorCode.java       |  1 +
 .../src/main/resources/asx_errormsg/en.properties  |  1 +
 .../external/feed/watch/AbstractSubscriber.java    | 29 +++++++++++++++++-----
 .../feed/watch/WaitForStateSubscriber.java         |  2 +-
 10 files changed, 74 insertions(+), 19 deletions(-)

diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
index e01d0a7..3c2f8e8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
@@ -34,7 +34,7 @@ public interface IActiveEntityEventSubscriber {
     void notify(ActiveEvent event);
 
     /**
-     * Checkcs whether the subscriber is done receiving events
+     * Checks whether the subscriber is done receiving events
      *
      * @return
      */
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index ca610aa..8338b2b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -75,7 +75,7 @@ public interface IActiveEntityEventsListener {
      * refresh the stats
      *
      * @param timeout
-     * @throws HyracksDataException
+     * @throws HyracksDataException throws ASX3118 if active entity is not currently running
      */
     void refreshStats(long timeout) throws HyracksDataException;
 
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index b8c44a6..1a2af13 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -56,6 +56,6 @@ public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddr
 
     @Override
     public String toString() {
-        return ActiveManagerMessage.class.getSimpleName();
+        return getClass().getSimpleName();
     }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index 0dbba52..117a68c 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -29,7 +29,17 @@ public class ActiveStatsRequestMessage extends ActiveManagerMessage {
         this.reqId = reqId;
     }
 
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+
     public long getReqId() {
         return reqId;
     }
+
+    @Override
+    public String toString() {
+        return "ActiveStatsRequestMessage{" + "reqId=" + reqId + '}';
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 882afc5..39ebdf7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.active;
 
+import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -93,7 +95,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl
     protected ActivityState prevState;
     protected JobId jobId;
     protected volatile long statsTimestamp;
-    protected String stats;
+    protected volatile String stats;
     protected volatile boolean isFetchingStats;
     protected int numRegistered;
     protected int numDeRegistered;
@@ -292,12 +294,17 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl
     @Override
     public void refreshStats(long timeout) throws HyracksDataException {
         LOGGER.log(level, "refreshStats called");
+        // first check state & if we are fetching outside of the lock- in the event we are recovering it may take some
+        // time to obtain the lock...
+        ensureRunning();
+        if (isFetchingStats) {
+            LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats);
+            return;
+        }
         synchronized (this) {
-            if (state != ActivityState.RUNNING) {
-                LOGGER.log(level, "returning immediately since state = " + state);
-                notifySubscribers(statsUpdatedEvent);
-                return;
-            } else if (isFetchingStats) {
+            // now that we have the lock, again verify the state & ensure we are not already fetching new stats
+            ensureRunning();
+            if (isFetchingStats) {
                 LOGGER.log(level, "returning immediately since fetchingStats = " + isFetchingStats);
                 return;
             } else {
@@ -323,6 +330,12 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl
         isFetchingStats = false;
     }
 
+    protected void ensureRunning() throws RuntimeDataException {
+        if (state != ActivityState.RUNNING) {
+            throw new RuntimeDataException(ACTIVE_ENTITY_NOT_RUNNING, runtimeName, String.valueOf(state).toLowerCase());
+        }
+    }
+
     protected synchronized void notifySubscribers(ActiveEvent event) {
         notifyAll();
         Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 80dde8a..ca5e1e6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.asterix.test.active;
 
+import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_RUNNING;
+import static org.apache.asterix.common.exceptions.ErrorCode.ASTERIX;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -116,7 +119,13 @@ public class ActiveStatsTest {
         Assert.assertTrue(requestedStats.contains("N/A"));
 
         // Update stats of not-started job
-        eventsListener.refreshStats(1000);
+        try {
+            eventsListener.refreshStats(1000);
+            Assert.fail("expected exception on refresh stats on not-started job");
+        } catch (HyracksDataException e) {
+            Assert.assertTrue("incorrect exception thrown (expected: ACTIVE_ENTITY_NOT_RUNNING, was: " + e,
+                    e.matches(ASTERIX, ACTIVE_ENTITY_NOT_RUNNING));
+        }
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.contains("N/A"));
         WaitForStateSubscriber startingSubscriber =
@@ -127,8 +136,12 @@ public class ActiveStatsTest {
         startingSubscriber.sync();
         activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
         activeJobNotificationHandler.notifyJobStart(jobId);
-        eventsListener.refreshStats(1000);
-        requestedStats = eventsListener.getStats();
+        try {
+            eventsListener.refreshStats(1000);
+        } catch (HyracksDataException e) {
+            Assert.assertTrue("incorrect exception thrown (expected: ACTIVE_ENTITY_NOT_RUNNING, was: " + e,
+                    e.matches(ASTERIX, ACTIVE_ENTITY_NOT_RUNNING));
+        }
         Assert.assertTrue(requestedStats.contains("N/A"));
         // Fake partition message and notify eventListener
         ActivePartitionMessage partitionMessage =
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index d57c433..1d18cf6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -316,6 +316,7 @@ public class ErrorCode {
     public static final int FAILED_TO_PARSE_METADATA = 3115;
     public static final int INPUT_DECODE_FAILURE = 3116;
     public static final int FAILED_TO_PARSE_MALFORMED_LOG_RECORD = 3117;
+    public static final int ACTIVE_ENTITY_NOT_RUNNING = 3118;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 5cf3968..a2780ba 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -314,6 +314,7 @@
 3115 = Failed to parse record metadata
 3116 = Failed to decode input
 3117 = Failed to parse record, malformed log record
+3118 = Active Entity %1$s is not running (it is %2$s)
 
 # Lifecycle management errors
 4000 = Partition id %1$s for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 37b157e..726880d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.feed.watch;
 
+import java.util.Objects;
+
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.hyracks.util.Span;
@@ -25,11 +27,19 @@ import org.apache.hyracks.util.Span;
 public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber {
 
     protected final IActiveEntityEventsListener listener;
+    private final Object lockObject;
     private volatile boolean done = false;
     private volatile Exception failure = null;
 
+    public AbstractSubscriber(IActiveEntityEventsListener listener, Object lockObject) {
+        Objects.requireNonNull(lockObject);
+        this.listener = listener;
+        this.lockObject = lockObject;
+    }
+
     public AbstractSubscriber(IActiveEntityEventsListener listener) {
         this.listener = listener;
+        this.lockObject = this;
     }
 
     @Override
@@ -38,28 +48,28 @@ public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber
     }
 
     public void complete(Exception failure) {
-        synchronized (listener) {
+        synchronized (lockObject) {
             if (failure != null) {
                 this.failure = failure;
             }
             done = true;
-            listener.notifyAll();
+            lockObject.notifyAll();
         }
     }
 
     @Override
     public void sync() throws InterruptedException {
-        synchronized (listener) {
+        synchronized (lockObject) {
             while (!done) {
-                listener.wait();
+                lockObject.wait();
             }
         }
     }
 
     public boolean sync(Span span) throws InterruptedException {
-        synchronized (listener) {
+        synchronized (lockObject) {
             while (!done) {
-                span.wait(listener);
+                span.wait(lockObject);
                 if (done || span.elapsed()) {
                     return done;
                 }
@@ -71,4 +81,11 @@ public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber
     public Exception getFailure() {
         return failure;
     }
+
+    protected void reset() {
+        synchronized (lockObject) {
+            done = false;
+            failure = null;
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index 818d826..4dc86ac 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -29,7 +29,7 @@ public class WaitForStateSubscriber extends AbstractSubscriber {
     private final Set<ActivityState> targetStates;
 
     public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates) {
-        super(listener);
+        super(listener, listener);
         this.targetStates = targetStates;
         listener.subscribe(this);
     }