You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2021/04/07 12:48:00 UTC

[asterixdb] 19/25: [NO ISSUE][*DB][ACT] += ActiveManagerMessage.GENERIC_EVENT

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit d18d6ac28b154e1570f863d779b9ae59add6887f
Author: Michael Blow <mb...@apache.org>
AuthorDate: Sat Apr 3 23:12:59 2021 -0400

    [NO ISSUE][*DB][ACT] += ActiveManagerMessage.GENERIC_EVENT
    
    Change-Id: I8f8986a90a6ac34a24118ace1a76401d65924055
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10863
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Michael Blow <mb...@apache.org>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../org/apache/asterix/active/ActiveManager.java    | 21 +++++++++++++++++++--
 .../org/apache/asterix/active/IActiveRuntime.java   |  5 +++++
 .../active/message/ActiveManagerMessage.java        | 14 +++++++++++---
 .../active/message/ActiveStatsRequestMessage.java   |  6 +++---
 .../active/message/StopRuntimeParameters.java       | 12 ++----------
 .../app/active/ActiveEntityEventsListener.java      |  2 +-
 .../apache/asterix/external/api/IRecordReader.java  |  5 +++++
 .../dataflow/AbstractFeedDataFlowController.java    |  5 +++++
 .../dataflow/FeedRecordDataFlowController.java      |  6 ++++++
 .../external/dataset/adapter/FeedAdapter.java       |  5 +++++
 10 files changed, 62 insertions(+), 19 deletions(-)

diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index ba04967..08e1be4 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -112,14 +112,31 @@ public class ActiveManager {
             case REQUEST_STATS:
                 requestStats((ActiveStatsRequestMessage) message);
                 break;
+            case GENERIC_EVENT:
+                deliverGenericEvent(message);
+                break;
             default:
                 LOGGER.warn("Unknown message type received: " + message.getKind());
         }
     }
 
+    private void deliverGenericEvent(ActiveManagerMessage message) throws HyracksDataException {
+        try {
+            ActiveRuntimeId runtimeId = message.getRuntimeId();
+            IActiveRuntime runtime = runtimes.get(runtimeId);
+            if (runtime == null) {
+                LOGGER.warn("Request for a runtime {} that is not registered {}", runtimeId, message);
+                return;
+            }
+            runtime.handleGenericEvent(message);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
     private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException {
         try {
-            ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+            ActiveRuntimeId runtimeId = message.getRuntimeId();
             IActiveRuntime runtime = runtimes.get(runtimeId);
             long reqId = message.getReqId();
             if (runtime == null) {
@@ -168,7 +185,7 @@ public class ActiveManager {
     @SuppressWarnings("squid:S1181") // Catch Error
     private void stopRuntime(ActiveManagerMessage message) {
         StopRuntimeParameters content = (StopRuntimeParameters) message.getPayload();
-        ActiveRuntimeId runtimeId = content.getRuntimeId();
+        ActiveRuntimeId runtimeId = message.getRuntimeId();
         IActiveRuntime runtime = runtimes.get(runtimeId);
         if (runtime == null) {
             LOGGER.warn("Request to stop runtime: " + runtimeId
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index a52f01e..b8edc64 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -20,6 +20,7 @@ package org.apache.asterix.active;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IActiveRuntime {
@@ -48,4 +49,8 @@ public interface IActiveRuntime {
     default String getStats() {
         return "\"Runtime stats is not available.\"";
     }
+
+    default void handleGenericEvent(ActiveManagerMessage event) throws HyracksDataException {
+        throw new IllegalStateException("generic events not supported for runtime " + getRuntimeId());
+    }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 1a2af13..4d726cf 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -21,6 +21,7 @@ package org.apache.asterix.active.message;
 import java.io.Serializable;
 
 import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.messaging.CcIdentifiedMessage;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
@@ -29,15 +30,18 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddressedMessage {
     public enum Kind {
         STOP_ACTIVITY,
-        REQUEST_STATS
+        REQUEST_STATS,
+        GENERIC_EVENT
     }
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final Kind kind;
+    private final ActiveRuntimeId runtimeId;
     private final Serializable payload;
 
-    public ActiveManagerMessage(Kind kind, Serializable payload) {
+    public ActiveManagerMessage(Kind kind, ActiveRuntimeId runtimeId, Serializable payload) {
         this.kind = kind;
+        this.runtimeId = runtimeId;
         this.payload = payload;
     }
 
@@ -45,6 +49,10 @@ public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddr
         return payload;
     }
 
+    public ActiveRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
     public Kind getKind() {
         return kind;
     }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index 117a68c..94668a0 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.active.message;
 
-import java.io.Serializable;
+import org.apache.asterix.active.ActiveRuntimeId;
 
 public class ActiveStatsRequestMessage extends ActiveManagerMessage {
     private static final long serialVersionUID = 1L;
     private final long reqId;
 
-    public ActiveStatsRequestMessage(Serializable payload, long reqId) {
-        super(Kind.REQUEST_STATS, payload);
+    public ActiveStatsRequestMessage(ActiveRuntimeId runtimeId, long reqId) {
+        super(Kind.REQUEST_STATS, runtimeId, null);
         this.reqId = reqId;
     }
 
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
index fbc41a1..c21f06e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
@@ -21,25 +21,17 @@ package org.apache.asterix.active.message;
 import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.asterix.active.ActiveRuntimeId;
-
 public class StopRuntimeParameters implements Serializable {
 
-    private static final long serialVersionUID = 1L;
-    private final ActiveRuntimeId runtimeId;
+    private static final long serialVersionUID = 2L;
     private final long timeout;
     private final TimeUnit unit;
 
-    public StopRuntimeParameters(ActiveRuntimeId runtimeId, long timeout, TimeUnit unit) {
-        this.runtimeId = runtimeId;
+    public StopRuntimeParameters(long timeout, TimeUnit unit) {
         this.timeout = timeout;
         this.unit = unit;
     }
 
-    public ActiveRuntimeId getRuntimeId() {
-        return runtimeId;
-    }
-
     public long getTimeout() {
         return timeout;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index c90cde0..5f7d65e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -515,7 +515,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl
             }
             ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
             messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
-                    new StopRuntimeParameters(runtimeId, timeout, unit)), location);
+                    runtimeId, new StopRuntimeParameters(timeout, unit)), location);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index cb97526..0e6ddb2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
@@ -95,4 +96,8 @@ public interface IRecordReader<T> extends Closeable {
     default LongSupplier getLineNumber() {
         return ExternalDataConstants.NO_LINES;
     }
+
+    default void handleGenericEvent(ActiveManagerMessage event) {
+        throw new IllegalStateException("unexpected generic event " + event);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index b58e604..94d9e6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.dataflow;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -57,4 +58,8 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
             feedLogManager.close();
         }
     }
+
+    public void handleGenericEvent(ActiveManagerMessage event) {
+        throw new IllegalStateException("unexpected generic event " + event);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 56257a8..8cec5de 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.dataflow;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
@@ -280,4 +281,9 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
                 .append("}");
         return str.toString();
     }
+
+    @Override
+    public void handleGenericEvent(ActiveManagerMessage event) {
+        recordReader.handleGenericEvent(event);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 54e633a..fc9b727 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.dataset.adapter;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -69,4 +70,8 @@ public class FeedAdapter implements IDataSourceAdapter, Closeable {
     public void close() throws IOException {
         controller.close();
     }
+
+    public void handleGenericEvent(ActiveManagerMessage event) {
+        controller.handleGenericEvent(event);
+    }
 }