You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2021/04/07 12:48:00 UTC
[asterixdb] 19/25: [NO ISSUE][*DB][ACT] +=
ActiveManagerMessage.GENERIC_EVENT
This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d18d6ac28b154e1570f863d779b9ae59add6887f
Author: Michael Blow <mb...@apache.org>
AuthorDate: Sat Apr 3 23:12:59 2021 -0400
[NO ISSUE][*DB][ACT] += ActiveManagerMessage.GENERIC_EVENT
Change-Id: I8f8986a90a6ac34a24118ace1a76401d65924055
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10863
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
.../org/apache/asterix/active/ActiveManager.java | 21 +++++++++++++++++++--
.../org/apache/asterix/active/IActiveRuntime.java | 5 +++++
.../active/message/ActiveManagerMessage.java | 14 +++++++++++---
.../active/message/ActiveStatsRequestMessage.java | 6 +++---
.../active/message/StopRuntimeParameters.java | 12 ++----------
.../app/active/ActiveEntityEventsListener.java | 2 +-
.../apache/asterix/external/api/IRecordReader.java | 5 +++++
.../dataflow/AbstractFeedDataFlowController.java | 5 +++++
.../dataflow/FeedRecordDataFlowController.java | 6 ++++++
.../external/dataset/adapter/FeedAdapter.java | 5 +++++
10 files changed, 62 insertions(+), 19 deletions(-)
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index ba04967..08e1be4 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -112,14 +112,31 @@ public class ActiveManager {
case REQUEST_STATS:
requestStats((ActiveStatsRequestMessage) message);
break;
+ case GENERIC_EVENT:
+ deliverGenericEvent(message);
+ break;
default:
LOGGER.warn("Unknown message type received: " + message.getKind());
}
}
+ private void deliverGenericEvent(ActiveManagerMessage message) throws HyracksDataException {
+ try {
+ ActiveRuntimeId runtimeId = message.getRuntimeId();
+ IActiveRuntime runtime = runtimes.get(runtimeId);
+ if (runtime == null) {
+ LOGGER.warn("Request for a runtime {} that is not registered {}", runtimeId, message);
+ return;
+ }
+ runtime.handleGenericEvent(message);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException {
try {
- ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+ ActiveRuntimeId runtimeId = message.getRuntimeId();
IActiveRuntime runtime = runtimes.get(runtimeId);
long reqId = message.getReqId();
if (runtime == null) {
@@ -168,7 +185,7 @@ public class ActiveManager {
@SuppressWarnings("squid:S1181") // Catch Error
private void stopRuntime(ActiveManagerMessage message) {
StopRuntimeParameters content = (StopRuntimeParameters) message.getPayload();
- ActiveRuntimeId runtimeId = content.getRuntimeId();
+ ActiveRuntimeId runtimeId = message.getRuntimeId();
IActiveRuntime runtime = runtimes.get(runtimeId);
if (runtime == null) {
LOGGER.warn("Request to stop runtime: " + runtimeId
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index a52f01e..b8edc64 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -20,6 +20,7 @@ package org.apache.asterix.active;
import java.util.concurrent.TimeUnit;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IActiveRuntime {
@@ -48,4 +49,8 @@ public interface IActiveRuntime {
default String getStats() {
return "\"Runtime stats is not available.\"";
}
+
+ default void handleGenericEvent(ActiveManagerMessage event) throws HyracksDataException {
+ throw new IllegalStateException("generic events not supported for runtime " + getRuntimeId());
+ }
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 1a2af13..4d726cf 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -21,6 +21,7 @@ package org.apache.asterix.active.message;
import java.io.Serializable;
import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.messaging.CcIdentifiedMessage;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
@@ -29,15 +30,18 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddressedMessage {
public enum Kind {
STOP_ACTIVITY,
- REQUEST_STATS
+ REQUEST_STATS,
+ GENERIC_EVENT
}
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final Kind kind;
+ private final ActiveRuntimeId runtimeId;
private final Serializable payload;
- public ActiveManagerMessage(Kind kind, Serializable payload) {
+ public ActiveManagerMessage(Kind kind, ActiveRuntimeId runtimeId, Serializable payload) {
this.kind = kind;
+ this.runtimeId = runtimeId;
this.payload = payload;
}
@@ -45,6 +49,10 @@ public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddr
return payload;
}
+ public ActiveRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
public Kind getKind() {
return kind;
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index 117a68c..94668a0 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.active.message;
-import java.io.Serializable;
+import org.apache.asterix.active.ActiveRuntimeId;
public class ActiveStatsRequestMessage extends ActiveManagerMessage {
private static final long serialVersionUID = 1L;
private final long reqId;
- public ActiveStatsRequestMessage(Serializable payload, long reqId) {
- super(Kind.REQUEST_STATS, payload);
+ public ActiveStatsRequestMessage(ActiveRuntimeId runtimeId, long reqId) {
+ super(Kind.REQUEST_STATS, runtimeId, null);
this.reqId = reqId;
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
index fbc41a1..c21f06e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java
@@ -21,25 +21,17 @@ package org.apache.asterix.active.message;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
-import org.apache.asterix.active.ActiveRuntimeId;
-
public class StopRuntimeParameters implements Serializable {
- private static final long serialVersionUID = 1L;
- private final ActiveRuntimeId runtimeId;
+ private static final long serialVersionUID = 2L;
private final long timeout;
private final TimeUnit unit;
- public StopRuntimeParameters(ActiveRuntimeId runtimeId, long timeout, TimeUnit unit) {
- this.runtimeId = runtimeId;
+ public StopRuntimeParameters(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
- public ActiveRuntimeId getRuntimeId() {
- return runtimeId;
- }
-
public long getTimeout() {
return timeout;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index c90cde0..5f7d65e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -515,7 +515,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl
}
ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
- new StopRuntimeParameters(runtimeId, timeout, unit)), location);
+ runtimeId, new StopRuntimeParameters(timeout, unit)), location);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index cb97526..0e6ddb2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
@@ -95,4 +96,8 @@ public interface IRecordReader<T> extends Closeable {
default LongSupplier getLineNumber() {
return ExternalDataConstants.NO_LINES;
}
+
+ default void handleGenericEvent(ActiveManagerMessage event) {
+ throw new IllegalStateException("unexpected generic event " + event);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index b58e604..94d9e6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.dataflow;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -57,4 +58,8 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
feedLogManager.close();
}
}
+
+ public void handleGenericEvent(ActiveManagerMessage event) {
+ throw new IllegalStateException("unexpected generic event " + event);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 56257a8..8cec5de 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.dataflow;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IRawRecord;
@@ -280,4 +281,9 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
.append("}");
return str.toString();
}
+
+ @Override
+ public void handleGenericEvent(ActiveManagerMessage event) {
+ recordReader.handleGenericEvent(event);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 54e633a..fc9b727 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.dataset.adapter;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -69,4 +70,8 @@ public class FeedAdapter implements IDataSourceAdapter, Closeable {
public void close() throws IOException {
controller.close();
}
+
+ public void handleGenericEvent(ActiveManagerMessage event) {
+ controller.handleGenericEvent(event);
+ }
}