You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org> on 2017/07/21 22:50:42 UTC
Change in asterixdb-bad[master]: Separate Predistributed Jobs from other Active Jobs
abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1894
Change subject: Separate Predistributed Jobs from other Active Jobs
......................................................................
Separate Predistributed Jobs from other Active Jobs
Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
---
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
11 files changed, 203 insertions(+), 89 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/94/1894/1
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 5775e9b..c6c3f08 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -59,8 +59,8 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
- MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new MetadataProvider(getApplicationContext(),
+ metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
tempMdProvider.setConfig(metadataProvider.getConfig());
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 1b655da..df6b19a 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -18,10 +18,9 @@
*/
package org.apache.asterix.bad.lang.statement;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.lang.BADLangExtension;
@@ -91,10 +90,9 @@
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
Channel channel = null;
MetadataTransactionContext mdTxnCtx = null;
@@ -115,7 +113,7 @@
listener.getExecutorService().shutdownNow();
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
- activeEventHandler.removeListener(listener);
+ activeEventHandler.unregisterListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 7d91d2e..bdde6f8 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -21,15 +21,17 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.StringReader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.ChannelJobService;
@@ -42,6 +44,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.lang.common.base.Expression;
@@ -56,7 +59,6 @@
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -67,7 +69,6 @@
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
@@ -131,11 +132,13 @@
return channelResultsInsertQuery;
}
- @Override public byte getCategory() {
+ @Override
+ public byte getCategory() {
return Category.DDL;
}
- @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
return null;
}
@@ -160,7 +163,8 @@
}
- @Override public byte getKind() {
+ @Override
+ public byte getKind() {
return Kind.EXTENSION;
}
@@ -188,14 +192,13 @@
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
- DatasetDecl createResultsDataset =
- new DatasetDecl(new Identifier(dataverse), resultsName, new Identifier(BADConstants.BAD_DATAVERSE_NAME),
- resultsTypeName, null, null, null, null, new HashMap<String, String>(),
- new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+ DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
+ new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
+ new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
//Run both statements to create datasets
- ((QueryTranslator) statementExecutor)
- .handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, hcc);
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+ hcc);
metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
@@ -231,9 +234,8 @@
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
- return ((QueryTranslator) statementExecutor)
- .handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, hdc, ResultDelivery.ASYNC, null,
- stats, true, null, null);
+ return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
+ hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null);
}
private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
@@ -244,15 +246,15 @@
if (predistributed) {
jobId = hcc.distributeJob(channeljobSpec);
}
- ScheduledExecutorService ses = ChannelJobService
- .startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), jobId, hcc,
- ChannelJobService.findPeriod(duration));
+ ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
+ jobId, hcc, ChannelJobService.findPeriod(duration));
listener.storeDistributedInfo(jobId, ses, null);
}
}
- @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
@@ -271,10 +273,9 @@
Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Channel channel = null;
@@ -287,7 +288,7 @@
throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
}
if (listener != null) {
- alreadyActive = listener.isEntityActive();
+ alreadyActive = listener.isActive();
}
if (alreadyActive) {
throw new AsterixException("Channel " + channelName + " is already running");
@@ -310,9 +311,8 @@
createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
- JobSpecification channeljobSpec =
- createChannelJob(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
- dataverse);
+ JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
+ tempMdProvider, hcc, hdc, stats, dataverse);
// Now we subscribe
if (listener == null) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index d203905..1deff05 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -26,10 +26,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
@@ -41,6 +40,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
@@ -52,7 +52,6 @@
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -96,7 +95,8 @@
this.period = (CallExpr) period;
}
- @Override public byte getKind() {
+ @Override
+ public byte getKind() {
return Kind.EXTENSION;
}
@@ -108,7 +108,8 @@
return signature;
}
- @Override public byte getCategory() {
+ @Override
+ public byte getCategory() {
return Category.DDL;
}
@@ -116,7 +117,8 @@
return period;
}
- @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
return null;
}
@@ -167,9 +169,10 @@
throw new CompilationException("Procedure can only execute a single statement");
}
if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
- return new Pair<>(((QueryTranslator) statementExecutor)
- .handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC,
- null, stats, true, null, null), PrecompiledType.INSERT);
+ return new Pair<>(
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+ fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null),
+ PrecompiledType.INSERT);
} else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
Pair<JobSpecification, PrecompiledType> pair =
new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)),
@@ -179,8 +182,8 @@
} else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
fStatements.get(0).accept(visitor, null);
- return new Pair<>(((QueryTranslator) statementExecutor)
- .handleDeleteStatement(metadataProvider, fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+ return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
+ fStatements.get(0), hcc, true), PrecompiledType.DELETE);
} else {
throw new CompilationException("Procedure can only execute a single delete, insert, or query");
}
@@ -193,18 +196,18 @@
listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
}
- @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
initialize();
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Procedure procedure = null;
@@ -212,13 +215,13 @@
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- procedure = BADLangExtension
- .getProcedure(mdTxnCtx, dataverse, signature.getName(), Integer.toString(signature.getArity()));
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
+ Integer.toString(signature.getArity()));
if (procedure != null) {
throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists.");
}
if (listener != null) {
- alreadyActive = listener.isEntityActive();
+ alreadyActive = listener.isActive();
}
if (alreadyActive) {
throw new AsterixException("Procedure " + signature.getName() + " is already running");
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 9bf3718..e60570c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -21,11 +21,10 @@
import java.util.EnumSet;
import java.util.concurrent.ScheduledExecutorService;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
@@ -95,13 +94,12 @@
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index f7c3a74..a1bf0d3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -18,10 +18,9 @@
*/
package org.apache.asterix.bad.lang.statement;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.lang.BADLangExtension;
@@ -82,16 +81,15 @@
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
FunctionSignature signature = getFunctionSignature();
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
signature.setNamespace(dataverse);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
@@ -115,7 +113,7 @@
}
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
- activeEventHandler.removeListener(listener);
+ activeEventHandler.unregisterListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd4470f..5597b0c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -25,6 +25,7 @@
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.common.api.ExtensionId;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.utils.MetadataUtil;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
@@ -34,7 +35,6 @@
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
-import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -42,8 +42,8 @@
public class BADMetadataExtension implements IMetadataExtension {
- public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId(
- BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
+ public static final ExtensionId BAD_METADATA_EXTENSION_ID =
+ new ExtensionId(BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
index d3d2e66..de1aab8 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -19,8 +19,8 @@
import java.io.DataInput;
import java.io.DataInputStream;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index e6d0249..d577260 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -19,9 +19,9 @@
import java.io.DataInput;
import java.io.DataInputStream;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
index 5eb18d1..8e0cf5f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
@@ -18,27 +18,30 @@
*/
package org.apache.asterix.bad.metadata;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.IActiveEntityEventSubscriber;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-public class PrecompiledJobEventListener extends ActiveEntityEventsListener {
- private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
+public class PrecompiledJobEventListener implements IActiveEntityEventsListener {
- private ScheduledExecutorService executorService = null;
- private ResultReader resultReader;
+ private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
public enum PrecompiledType {
CHANNEL,
@@ -47,13 +50,117 @@
DELETE
}
+ enum RequestState {
+ INIT,
+ STARTED,
+ FINISHED
+ }
+
+ private ScheduledExecutorService executorService = null;
+ private ResultReader resultReader;
private final PrecompiledType type;
+ // members
+ protected volatile ActivityState state;
+ protected JobId jobId;
+ protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>();
+ protected final ICcApplicationContext appCtx;
+ protected final EntityId entityId;
+ protected final List<IDataset> datasets;
+ protected final ActiveEvent statsUpdatedEvent;
+ protected long statsTimestamp;
+ protected String stats;
+ protected RequestState statsRequestState;
+ protected final String runtimeName;
+ protected final AlgebricksAbsolutePartitionConstraint locations;
+ protected int numRegistered;
public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
- super(appCtx, entityId, datasets, locations, runtimeName);
+ this.appCtx = appCtx;
+ this.entityId = entityId;
+ this.datasets = datasets;
+ this.state = ActivityState.STOPPED;
+ this.statsTimestamp = -1;
+ this.statsRequestState = RequestState.INIT;
+ this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null);
+ this.stats = "{\"Stats\":\"N/A\"}";
+ this.runtimeName = runtimeName;
+ this.locations = locations;
+ this.numRegistered = 0;
state = ActivityState.STOPPED;
this.type = type;
+ }
+
+ protected synchronized void handle(ActivePartitionMessage message) {
+ if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+ numRegistered++;
+ if (numRegistered == locations.getLocations().length) {
+ state = ActivityState.RUNNING;
+ }
+ }
+ }
+
+ @Override
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ @Override
+ public ActivityState getState() {
+ return state;
+ }
+
+ @Override
+ public boolean isEntityUsingDataset(IDataset dataset) {
+ return datasets.contains(dataset);
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public String getStats() {
+ return stats;
+ }
+
+ @Override
+ public long getStatsTimeStamp() {
+ return statsTimestamp;
+ }
+
+ public String formatStats(List<String> responses) {
+ StringBuilder strBuilder = new StringBuilder();
+ strBuilder.append("{\"Stats\": [").append(responses.get(0));
+ for (int i = 1; i < responses.size(); i++) {
+ strBuilder.append(", ").append(responses.get(i));
+ }
+ strBuilder.append("]}");
+ return strBuilder.toString();
+ }
+
+ protected synchronized void notifySubscribers(ActiveEvent event) {
+ notifyAll();
+ Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
+ while (it.hasNext()) {
+ IActiveEntityEventSubscriber subscriber = it.next();
+ if (subscriber.isDone()) {
+ it.remove();
+ } else {
+ try {
+ subscriber.notify(event);
+ } catch (HyracksDataException e) {
+ LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
+ }
+ if (subscriber.isDone()) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ public AlgebricksAbsolutePartitionConstraint getLocations() {
+ return locations;
}
public ResultReader getResultReader() {
@@ -72,10 +179,6 @@
public ScheduledExecutorService getExecutorService() {
return executorService;
- }
-
- public boolean isEntityActive() {
- return state == ActivityState.STARTED;
}
public void deActivate() {
@@ -110,7 +213,7 @@
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Channel Job started for " + entityId);
}
- state = ActivityState.STARTED;
+ state = ActivityState.RUNNING;
}
private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
@@ -120,7 +223,21 @@
}
@Override
- public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException {
+ public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException {
// no op
}
+
+ @Override
+ public boolean isActive() {
+ return state == ActivityState.RUNNING;
+ }
+
+ @Override
+ public void unregister() throws HyracksDataException {
+ }
+
+ @Override
+ public Exception getJobFailure() {
+ return null;
+ }
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
index f324a46..1aa633f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -26,8 +26,8 @@
import java.util.List;
import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.ARecord;
--
To view, visit https://asterix-gerrit.ics.uci.edu/1894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Change in asterixdb-bad[master]: Separate Predistributed Jobs from other Active Jobs
Posted by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org>.
Xikui Wang has posted comments on this change.
Change subject: Separate Predistributed Jobs from other Active Jobs
......................................................................
Patch Set 1:
(1 comment)
https://asterix-gerrit.ics.uci.edu/#/c/1894/1/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java:
PS1, Line 116: unregisterListener
This is a new interface change? Should I wait after the main one is merged to +2 this one? The rests look good to me.
--
To view, visit https://asterix-gerrit.ics.uci.edu/1894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-HasComments: Yes
Change in asterixdb-bad[master]: Separate Predistributed Jobs from other Active Jobs
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Separate Predistributed Jobs from other Active Jobs
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/160/
--
To view, visit https://asterix-gerrit.ics.uci.edu/1894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb-bad[master]: Separate Predistributed Jobs from other Active Jobs
Posted by "Till Westmann (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,
I'd like you to reexamine a change. Please visit
https://asterix-gerrit.ics.uci.edu/1894
to look at the new patch set (#2).
Change subject: Separate Predistributed Jobs from other Active Jobs
......................................................................
Separate Predistributed Jobs from other Active Jobs
Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
---
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
14 files changed, 209 insertions(+), 100 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/94/1894/2
--
To view, visit https://asterix-gerrit.ics.uci.edu/1894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Change in asterixdb-bad[master]: Separate Predistributed Jobs from other Active Jobs
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Separate Predistributed Jobs from other Active Jobs
......................................................................
Patch Set 2:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterixbad-gerrit/161/
--
To view, visit https://asterix-gerrit.ics.uci.edu/1894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb-bad[master]: Separate Predistributed Jobs from other Active Jobs
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.
Change subject: Separate Predistributed Jobs from other Active Jobs
......................................................................
Patch Set 2: Code-Review+2
--
To view, visit https://asterix-gerrit.ics.uci.edu/1894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb-bad[master]: Separate Predistributed Jobs from other Active Jobs
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has submitted this change and it was merged.
Change subject: Separate Predistributed Jobs from other Active Jobs
......................................................................
Separate Predistributed Jobs from other Active Jobs
Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
---
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
14 files changed, 209 insertions(+), 100 deletions(-)
Approvals:
abdullah alamoudi: Looks good to me, approved
Jenkins: Verified
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
index 09a436b..8f5d520 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADQueryTranslatorFactory.java
@@ -33,7 +33,6 @@
@Override
public QueryTranslator create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) {
- return new BADStatementExecutor(appCtx, statements, output, compilationProvider, storageComponentProvider,
- executorService);
+ return new BADStatementExecutor(appCtx, statements, output, compilationProvider, executorService);
}
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 5775e9b..bc17a7d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -28,7 +28,6 @@
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.Procedure;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
@@ -44,9 +43,8 @@
public class BADStatementExecutor extends QueryTranslator {
public BADStatementExecutor(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
- ILangCompilationProvider compliationProvider, IStorageComponentProvider storageComponentProvider,
- ExecutorService executorService) {
- super(appCtx, statements, output, compliationProvider, storageComponentProvider, executorService);
+ ILangCompilationProvider compliationProvider, ExecutorService executorService) {
+ super(appCtx, statements, output, compliationProvider, executorService);
}
@Override
@@ -59,8 +57,7 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
Identifier dvId = ((DataverseDropStatement) stmt).getDataverseName();
List<Broker> brokers = BADLangExtension.getBrokers(mdTxnCtx, dvId.getValue());
- MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
for (Broker broker : brokers) {
tempMdProvider.getLocks().reset();
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 1b655da..907bd0e 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -18,10 +18,9 @@
*/
package org.apache.asterix.bad.lang.statement;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.lang.BADLangExtension;
@@ -91,10 +90,9 @@
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
Channel channel = null;
MetadataTransactionContext mdTxnCtx = null;
@@ -115,14 +113,13 @@
listener.getExecutorService().shutdownNow();
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
- activeEventHandler.removeListener(listener);
+ activeEventHandler.unregisterListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
//Create a metadata provider to use in nested jobs.
- MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
+ MetadataProvider tempMdProvider = new MetadataProvider(appCtx, metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Drop the Channel Datasets
//TODO: Need to find some way to handle if this fails.
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
index 5bf0690..df8dab1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -186,7 +186,7 @@
subscriptionTuple.setVarCounter(varCounter);
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
- metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
+ metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
if (subscriptionId == null) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index 60de69e..28d09df 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -144,7 +144,7 @@
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
delete.accept(visitor, null);
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
- metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
+ metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 7d91d2e..a43020f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -21,15 +21,17 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.StringReader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.ChannelJobService;
@@ -42,6 +44,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.lang.common.base.Expression;
@@ -56,7 +59,6 @@
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -67,7 +69,6 @@
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
@@ -131,11 +132,13 @@
return channelResultsInsertQuery;
}
- @Override public byte getCategory() {
+ @Override
+ public byte getCategory() {
return Category.DDL;
}
- @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
return null;
}
@@ -160,7 +163,8 @@
}
- @Override public byte getKind() {
+ @Override
+ public byte getKind() {
return Kind.EXTENSION;
}
@@ -188,14 +192,13 @@
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
- DatasetDecl createResultsDataset =
- new DatasetDecl(new Identifier(dataverse), resultsName, new Identifier(BADConstants.BAD_DATAVERSE_NAME),
- resultsTypeName, null, null, null, null, new HashMap<String, String>(),
- new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+ DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
+ new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
+ new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
//Run both statements to create datasets
- ((QueryTranslator) statementExecutor)
- .handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, hcc);
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+ hcc);
metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
@@ -231,9 +234,8 @@
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
- return ((QueryTranslator) statementExecutor)
- .handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, hdc, ResultDelivery.ASYNC, null,
- stats, true, null, null);
+ return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
+ hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null);
}
private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
@@ -244,15 +246,15 @@
if (predistributed) {
jobId = hcc.distributeJob(channeljobSpec);
}
- ScheduledExecutorService ses = ChannelJobService
- .startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), jobId, hcc,
- ChannelJobService.findPeriod(duration));
+ ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
+ jobId, hcc, ChannelJobService.findPeriod(duration));
listener.storeDistributedInfo(jobId, ses, null);
}
}
- @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
@@ -271,10 +273,9 @@
Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Channel channel = null;
@@ -287,7 +288,7 @@
throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
}
if (listener != null) {
- alreadyActive = listener.isEntityActive();
+ alreadyActive = listener.isActive();
}
if (alreadyActive) {
throw new AsterixException("Channel " + channelName + " is already running");
@@ -304,15 +305,14 @@
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
- metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
+ metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
//Create Channel Datasets
createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
- JobSpecification channeljobSpec =
- createChannelJob(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, stats,
- dataverse);
+ JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
+ tempMdProvider, hcc, hdc, stats, dataverse);
// Now we subscribe
if (listener == null) {
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index d203905..1c497a8 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -26,10 +26,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
@@ -41,6 +40,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
@@ -52,7 +52,6 @@
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -96,7 +95,8 @@
this.period = (CallExpr) period;
}
- @Override public byte getKind() {
+ @Override
+ public byte getKind() {
return Kind.EXTENSION;
}
@@ -108,7 +108,8 @@
return signature;
}
- @Override public byte getCategory() {
+ @Override
+ public byte getCategory() {
return Category.DDL;
}
@@ -116,7 +117,8 @@
return period;
}
- @Override public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
return null;
}
@@ -167,9 +169,10 @@
throw new CompilationException("Procedure can only execute a single statement");
}
if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
- return new Pair<>(((QueryTranslator) statementExecutor)
- .handleInsertUpsertStatement(metadataProvider, fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC,
- null, stats, true, null, null), PrecompiledType.INSERT);
+ return new Pair<>(
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider,
+ fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null),
+ PrecompiledType.INSERT);
} else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
Pair<JobSpecification, PrecompiledType> pair =
new Pair<>(compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) fStatements.get(0)),
@@ -179,8 +182,8 @@
} else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor();
fStatements.get(0).accept(visitor, null);
- return new Pair<>(((QueryTranslator) statementExecutor)
- .handleDeleteStatement(metadataProvider, fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+ return new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider,
+ fStatements.get(0), hcc, true), PrecompiledType.DELETE);
} else {
throw new CompilationException("Procedure can only execute a single delete, insert, or query");
}
@@ -193,18 +196,18 @@
listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, jobId, resultSetId));
}
- @Override public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
initialize();
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Procedure procedure = null;
@@ -212,13 +215,13 @@
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- procedure = BADLangExtension
- .getProcedure(mdTxnCtx, dataverse, signature.getName(), Integer.toString(signature.getArity()));
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse, signature.getName(),
+ Integer.toString(signature.getArity()));
if (procedure != null) {
throw new AlgebricksException("A procedure with this name " + signature.getName() + " already exists.");
}
if (listener != null) {
- alreadyActive = listener.isEntityActive();
+ alreadyActive = listener.isActive();
}
if (alreadyActive) {
throw new AsterixException("Procedure " + signature.getName() + " is already running");
@@ -228,7 +231,7 @@
Function.RETURNTYPE_VOID, getFunctionBody(), Function.LANGUAGE_AQL, duration);
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
- metadataProvider.getDefaultDataverse(), metadataProvider.getStorageComponentProvider());
+ metadataProvider.getDefaultDataverse());
tempMdProvider.setConfig(metadataProvider.getConfig());
metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 9bf3718..e60570c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -21,11 +21,10 @@
import java.util.EnumSet;
import java.util.concurrent.ScheduledExecutorService;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
@@ -95,13 +94,12 @@
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(dataverseName));
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, procedureName);
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index f7c3a74..a1bf0d3 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -18,10 +18,9 @@
*/
package org.apache.asterix.bad.lang.statement;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.lang.BADLangExtension;
@@ -82,16 +81,15 @@
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler = activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
FunctionSignature signature = getFunctionSignature();
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new Identifier(signature.getNamespace()));
signature.setNamespace(dataverse);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverse, signature.getName());
- PrecompiledJobEventListener listener =
- (PrecompiledJobEventListener) activeEventHandler.getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
Procedure procedure = null;
MetadataTransactionContext mdTxnCtx = null;
@@ -115,7 +113,7 @@
}
JobId hyracksJobId = listener.getJobId();
listener.deActivate();
- activeEventHandler.removeListener(listener);
+ activeEventHandler.unregisterListener(listener);
if (hyracksJobId != null) {
hcc.destroyJob(hyracksJobId);
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index cd4470f..cd2ff86 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -42,8 +42,8 @@
public class BADMetadataExtension implements IMetadataExtension {
- public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId(
- BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
+ public static final ExtensionId BAD_METADATA_EXTENSION_ID =
+ new ExtensionId(BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
index d3d2e66..de1aab8 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -19,8 +19,8 @@
import java.io.DataInput;
import java.io.DataInputStream;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index e6d0249..d577260 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -19,9 +19,9 @@
import java.io.DataInput;
import java.io.DataInputStream;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.ARecord;
import org.apache.asterix.om.base.AString;
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
index 5eb18d1..8e0cf5f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
@@ -18,27 +18,30 @@
*/
package org.apache.asterix.bad.metadata;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveEvent.Kind;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.IActiveEntityEventSubscriber;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-public class PrecompiledJobEventListener extends ActiveEntityEventsListener {
- private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
+public class PrecompiledJobEventListener implements IActiveEntityEventsListener {
- private ScheduledExecutorService executorService = null;
- private ResultReader resultReader;
+ private static final Logger LOGGER = Logger.getLogger(PrecompiledJobEventListener.class);
public enum PrecompiledType {
CHANNEL,
@@ -47,13 +50,117 @@
DELETE
}
+ enum RequestState {
+ INIT,
+ STARTED,
+ FINISHED
+ }
+
+ private ScheduledExecutorService executorService = null;
+ private ResultReader resultReader;
private final PrecompiledType type;
+ // members
+ protected volatile ActivityState state;
+ protected JobId jobId;
+ protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>();
+ protected final ICcApplicationContext appCtx;
+ protected final EntityId entityId;
+ protected final List<IDataset> datasets;
+ protected final ActiveEvent statsUpdatedEvent;
+ protected long statsTimestamp;
+ protected String stats;
+ protected RequestState statsRequestState;
+ protected final String runtimeName;
+ protected final AlgebricksAbsolutePartitionConstraint locations;
+ protected int numRegistered;
public PrecompiledJobEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type,
List<IDataset> datasets, AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
- super(appCtx, entityId, datasets, locations, runtimeName);
+ this.appCtx = appCtx;
+ this.entityId = entityId;
+ this.datasets = datasets;
+ this.state = ActivityState.STOPPED;
+ this.statsTimestamp = -1;
+ this.statsRequestState = RequestState.INIT;
+ this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null);
+ this.stats = "{\"Stats\":\"N/A\"}";
+ this.runtimeName = runtimeName;
+ this.locations = locations;
+ this.numRegistered = 0;
state = ActivityState.STOPPED;
this.type = type;
+ }
+
+ protected synchronized void handle(ActivePartitionMessage message) {
+ if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+ numRegistered++;
+ if (numRegistered == locations.getLocations().length) {
+ state = ActivityState.RUNNING;
+ }
+ }
+ }
+
+ @Override
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ @Override
+ public ActivityState getState() {
+ return state;
+ }
+
+ @Override
+ public boolean isEntityUsingDataset(IDataset dataset) {
+ return datasets.contains(dataset);
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public String getStats() {
+ return stats;
+ }
+
+ @Override
+ public long getStatsTimeStamp() {
+ return statsTimestamp;
+ }
+
+ public String formatStats(List<String> responses) {
+ StringBuilder strBuilder = new StringBuilder();
+ strBuilder.append("{\"Stats\": [").append(responses.get(0));
+ for (int i = 1; i < responses.size(); i++) {
+ strBuilder.append(", ").append(responses.get(i));
+ }
+ strBuilder.append("]}");
+ return strBuilder.toString();
+ }
+
+ protected synchronized void notifySubscribers(ActiveEvent event) {
+ notifyAll();
+ Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
+ while (it.hasNext()) {
+ IActiveEntityEventSubscriber subscriber = it.next();
+ if (subscriber.isDone()) {
+ it.remove();
+ } else {
+ try {
+ subscriber.notify(event);
+ } catch (HyracksDataException e) {
+ LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
+ }
+ if (subscriber.isDone()) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ public AlgebricksAbsolutePartitionConstraint getLocations() {
+ return locations;
}
public ResultReader getResultReader() {
@@ -72,10 +179,6 @@
public ScheduledExecutorService getExecutorService() {
return executorService;
- }
-
- public boolean isEntityActive() {
- return state == ActivityState.STARTED;
}
public void deActivate() {
@@ -110,7 +213,7 @@
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Channel Job started for " + entityId);
}
- state = ActivityState.STARTED;
+ state = ActivityState.RUNNING;
}
private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
@@ -120,7 +223,21 @@
}
@Override
- public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException {
+ public synchronized void subscribe(IActiveEntityEventSubscriber subscriber) throws HyracksDataException {
// no op
}
+
+ @Override
+ public boolean isActive() {
+ return state == ActivityState.RUNNING;
+ }
+
+ @Override
+ public void unregister() throws HyracksDataException {
+ }
+
+ @Override
+ public Exception getJobFailure() {
+ return null;
+ }
}
diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
index f324a46..1aa633f 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -26,8 +26,8 @@
import java.util.List;
import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.ARecord;
--
To view, visit https://asterix-gerrit.ics.uci.edu/1894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Change in asterixdb-bad[master]: Separate Predistributed Jobs from other Active Jobs
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.
Change subject: Separate Predistributed Jobs from other Active Jobs
......................................................................
Patch Set 1:
(1 comment)
https://asterix-gerrit.ics.uci.edu/#/c/1894/1/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
File asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java:
PS1, Line 116: unregisterListener
> This is a new interface change? Should I wait after the main one is merged
yes please. I just renamed to make it symmetric. register/unregister sounds better than register/remove
--
To view, visit https://asterix-gerrit.ics.uci.edu/1894
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I18de53d55bcd2b423b9c8f4521fc8c3db1f432e1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes