You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2016/09/09 20:41:50 UTC
[11/12] asterixdb-bad git commit: Initial commit
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
new file mode 100644
index 0000000..bfa6bf1
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.FieldBinding;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.UpsertStatement;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelSubscribeStatement implements IExtensionStatement {
+
+ private final Identifier dataverseName;
+ private final Identifier channelName;
+ private final Identifier brokerDataverseName;
+ private final Identifier brokerName;
+ private final List<Expression> argList;
+ private final String subscriptionId;
+ private final int varCounter;
+
+ public ChannelSubscribeStatement(Identifier dataverseName, Identifier channelName, List<Expression> argList,
+ int varCounter, Identifier brokerDataverseName, Identifier brokerName, String subscriptionId) {
+ this.channelName = channelName;
+ this.dataverseName = dataverseName;
+ this.brokerDataverseName = brokerDataverseName;
+ this.brokerName = brokerName;
+ this.argList = argList;
+ this.subscriptionId = subscriptionId;
+ this.varCounter = varCounter;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getBrokerDataverseName() {
+ return brokerDataverseName;
+ }
+
+ public Identifier getChannelName() {
+ return channelName;
+ }
+
+ public Identifier getBrokerName() {
+ return brokerName;
+ }
+
+ public List<Expression> getArgList() {
+ return argList;
+ }
+
+ public int getVarCounter() {
+ return varCounter;
+ }
+
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.QUERY;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
+ throws HyracksDataException, AlgebricksException {
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+
+ Channel channel = BADLangExtension.getChannel(mdTxnCtx, dataverseName.getValue(), channelName.getValue());
+ if (channel == null) {
+ throw new AsterixException("There is no channel with this name " + channelName + ".");
+ }
+ Broker broker = BADLangExtension.getBroker(mdTxnCtx, brokerDataverseName.getValue(), brokerName.getValue());
+ if (broker == null) {
+ throw new AsterixException("There is no broker with this name " + brokerName + ".");
+ }
+
+ String subscriptionsDatasetName = channel.getSubscriptionsDataset();
+ List<String> returnField = new ArrayList<String>();
+ returnField.add(BADConstants.SubscriptionId);
+
+ if (argList.size() != channel.getFunction().getArity()) {
+ throw new AsterixException("Channel expected " + channel.getFunction().getArity()
+ + " parameters but got " + argList.size());
+ }
+
+ Query subscriptionTuple = new Query(false);
+
+ List<FieldBinding> fb = new ArrayList<FieldBinding>();
+ LiteralExpr leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerDataverse));
+ Expression rightExpr = new LiteralExpr(new StringLiteral(broker.getDataverseName()));
+ fb.add(new FieldBinding(leftExpr, rightExpr));
+
+ leftExpr = new LiteralExpr(new StringLiteral(BADConstants.BrokerName));
+ rightExpr = new LiteralExpr(new StringLiteral(broker.getBrokerName()));
+ fb.add(new FieldBinding(leftExpr, rightExpr));
+
+ if (subscriptionId != null) {
+ leftExpr = new LiteralExpr(new StringLiteral(BADConstants.SubscriptionId));
+
+ List<Expression> UUIDList = new ArrayList<Expression>();
+ UUIDList.add(new LiteralExpr(new StringLiteral(subscriptionId)));
+ FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
+ FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
+ function.getArity());
+ CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
+
+ rightExpr = UUIDCall;
+ fb.add(new FieldBinding(leftExpr, rightExpr));
+ }
+
+ for (int i = 0; i < argList.size(); i++) {
+ leftExpr = new LiteralExpr(new StringLiteral("param" + i));
+ rightExpr = argList.get(i);
+ fb.add(new FieldBinding(leftExpr, rightExpr));
+ }
+ RecordConstructor recordCon = new RecordConstructor(fb);
+ subscriptionTuple.setBody(recordCon);
+
+ subscriptionTuple.setVarCounter(varCounter);
+
+ if (subscriptionId == null) {
+ InsertStatement insert = new InsertStatement(dataverseName, new Identifier(subscriptionsDatasetName),
+ subscriptionTuple, varCounter, false, returnField);
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, insert, hcc, hdc,
+ resultDelivery, stats, false);
+ } else {
+ UpsertStatement upsert = new UpsertStatement(dataverseName, new Identifier(subscriptionsDatasetName),
+ subscriptionTuple, varCounter);
+ ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, upsert, hcc, hdc,
+ resultDelivery, stats, false);
+ }
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
new file mode 100644
index 0000000..17e3ad2
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.List;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ChannelUnsubscribeStatement implements IExtensionStatement {
+
+ private final Identifier dataverseName;
+ private final Identifier channelName;
+ private final String subscriptionId;
+ private final int varCounter;
+ private VariableExpr vars;
+ private List<String> dataverses;
+ private List<String> datasets;
+
+ public ChannelUnsubscribeStatement(VariableExpr vars, Identifier dataverseName, Identifier channelName,
+ String subscriptionId, int varCounter, List<String> dataverses, List<String> datasets) {
+ this.vars = vars;
+ this.channelName = channelName;
+ this.dataverseName = dataverseName;
+ this.subscriptionId = subscriptionId;
+ this.varCounter = varCounter;
+ this.dataverses = dataverses;
+ this.datasets = datasets;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public VariableExpr getVariableExpr() {
+ return vars;
+ }
+
+ public Identifier getChannelName() {
+ return channelName;
+ }
+
+ public String getsubScriptionId() {
+ return subscriptionId;
+ }
+
+ public List<String> getDataverses() {
+ return dataverses;
+ }
+
+ public List<String> getDatasets() {
+ return datasets;
+ }
+
+ public int getVarCounter() {
+ return varCounter;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.UPDATE;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
+ throws HyracksDataException, AlgebricksException {
+ /* ChannelUnsubscribeStatement stmtChannelSub = (ChannelUnsubscribeStatement) stmt;
+ String dataverseName = getActiveDataverse(stmtChannelSub.getDataverseName());
+ Identifier channelName = stmtChannelSub.getChannelName();
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ MetadataLockManager.INSTANCE.readChannelBegin(dataverseName, dataverseName + "." + channelName);
+ try {
+ Channel channel = MetadataManager.INSTANCE.getChannel(mdTxnCtx, dataverseName, channelName.getValue());
+ if (channel == null) {
+ throw new AsterixException("There is no channel with this name " + channelName + ".");
+ }
+ Identifier subscriptionsDatasetName = new Identifier(channel.getSubscriptionsDataset());
+
+ VariableExpr vars = stmtChannelSub.getVariableExpr();
+
+ //Need a condition to say subscription-id = sid
+ OperatorExpr condition = new OperatorExpr();
+ FieldAccessor fa = new FieldAccessor(vars, new Identifier(ActiveConstants.SubscriptionId));
+ condition.addOperand(fa);
+ condition.setCurrentop(true);
+ condition.addOperator("=");
+
+ String sid = stmtChannelSub.getsubScriptionId();
+ List<Expression> UUIDList = new ArrayList<Expression>();
+ UUIDList.add(new LiteralExpr(new StringLiteral(sid)));
+
+ FunctionIdentifier function = AsterixBuiltinFunctions.UUID_CONSTRUCTOR;
+ FunctionSignature UUIDfunc = new FunctionSignature(function.getNamespace(), function.getName(),
+ function.getArity());
+ CallExpr UUIDCall = new CallExpr(UUIDfunc, UUIDList);
+
+ condition.addOperand(UUIDCall);
+
+ DeleteStatement delete = new DeleteStatement(vars, new Identifier(dataverseName), subscriptionsDatasetName,
+ condition, stmtChannelSub.getVarCounter(), stmtChannelSub.getDataverses(),
+ stmtChannelSub.getDatasets());
+ AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
+ delete.accept(visitor, null);
+
+ handleDeleteStatement(metadataProvider, delete, hcc);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+
+ } catch (Exception e) {
+ abort(e, e, mdTxnCtx);
+ throw e;
+ } finally {
+ MetadataLockManager.INSTANCE.readChannelEnd(dataverseName, dataverseName + "." + channelName);
+ }*/
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
new file mode 100644
index 0000000..5b480ae
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/CreateBrokerStatement.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Broker;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CreateBrokerStatement implements IExtensionStatement {
+
+ private static final Logger LOGGER = Logger.getLogger(CreateBrokerStatement.class.getName());
+ private final Identifier dataverseName;
+ private final Identifier brokerName;
+ private String endPointName;
+
+ public CreateBrokerStatement(Identifier dataverseName, Identifier brokerName, String endPointName) {
+ this.brokerName = brokerName;
+ this.dataverseName = dataverseName;
+ this.endPointName = endPointName;
+ }
+
+ public String getEndPointName() {
+ return endPointName;
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getBrokerName() {
+ return brokerName;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
+ throws HyracksDataException, AlgebricksException {
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Broker broker = BADLangExtension.getBroker(mdTxnCtx, dataverseName.getValue(), brokerName.getValue());
+ if (broker != null) {
+ throw new AlgebricksException("A broker with this name " + brokerName + " already exists.");
+ }
+ broker = new Broker(dataverseName.getValue(), brokerName.getValue(), endPointName);
+ MetadataManager.INSTANCE.addEntity(mdTxnCtx, broker);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (mdTxnCtx != null) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ LOGGER.log(Level.WARNING, "Failed creating a broker", e);
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
new file mode 100644
index 0000000..d862052
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.file.JobSpecificationUtils;
+import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DatasetDecl;
+import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
+import org.apache.asterix.lang.common.statement.InsertStatement;
+import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.util.JobUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
+
+public class CreateChannelStatement implements IExtensionStatement {
+
+ private static final Logger LOGGER = Logger.getLogger(CreateChannelStatement.class.getName());
+
+ private final Identifier dataverseName;
+ private final Identifier channelName;
+ private final FunctionSignature function;
+ private final CallExpr period;
+ private String duration;
+ private InsertStatement channelResultsInsertQuery;
+ private String subscriptionsTableName;
+ private String resultsTableName;
+
+ public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
+ Expression period) {
+ this.channelName = channelName;
+ this.dataverseName = dataverseName;
+ this.function = function;
+ this.period = (CallExpr) period;
+ this.duration = "";
+ }
+
+ public Identifier getDataverseName() {
+ return dataverseName;
+ }
+
+ public Identifier getChannelName() {
+ return channelName;
+ }
+
+ public String getResultsName() {
+ return resultsTableName;
+ }
+
+ public String getSubscriptionsName() {
+ return subscriptionsTableName;
+ }
+
+ public String getDuration() {
+ return duration;
+ }
+
+ public FunctionSignature getFunction() {
+ return function;
+ }
+
+ public Expression getPeriod() {
+ return period;
+ }
+
+ public InsertStatement getChannelResultsInsertQuery() {
+ return channelResultsInsertQuery;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ public void initialize(MetadataTransactionContext mdTxnCtx, String subscriptionsTableName, String resultsTableName)
+ throws MetadataException, HyracksDataException {
+ Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
+ if (lookup == null) {
+ throw new MetadataException(" Unknown function " + function.getName());
+ }
+
+ if (!period.getFunctionSignature().getName().equals("duration")) {
+ throw new MetadataException(
+ "Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
+ }
+ duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
+ IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(bos);
+ durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
+ this.resultsTableName = resultsTableName;
+ this.subscriptionsTableName = subscriptionsTableName;
+
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ public Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildChannelJobSpec(String dataverseName,
+ String channelName, String duration, AqlMetadataProvider metadataProvider, JobSpecification channeljobSpec,
+ String strIP, int port) throws Exception {
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+ IOperatorDescriptor channelQueryExecuter;
+ AlgebricksPartitionConstraint executerPc;
+
+ Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> p = buildChannelRuntime(spec, dataverseName,
+ channelName, duration, channeljobSpec, strIP, port);
+ channelQueryExecuter = p.first;
+ executerPc = p.second;
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, channelQueryExecuter, executerPc);
+ spec.addRoot(channelQueryExecuter);
+ return new Pair<>(spec, p.second);
+
+ }
+
+ public Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> buildChannelRuntime(
+ JobSpecification jobSpec, String dataverseName, String channelName, String duration,
+ JobSpecification channeljobSpec, String strIP, int port) throws Exception {
+ RepetitiveChannelOperatorDescriptor channelOp = new RepetitiveChannelOperatorDescriptor(jobSpec, dataverseName,
+ channelName, duration, channeljobSpec, strIP, port);
+
+ String partition = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations()[0];
+ Set<String> ncs = new HashSet<>(Arrays.asList(partition));
+ AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+ ncs.toArray(new String[ncs.size()]));
+ return new Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint>(channelOp, partitionConstraint);
+ }
+
+ private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName,
+ Identifier resultsName, AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ IHyracksDataset hdc, Stats stats) throws AsterixException, Exception {
+
+ Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
+ Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
+ //Setup the subscriptions dataset
+ List<List<String>> partitionFields = new ArrayList<List<String>>();
+ List<Integer> keyIndicators = new ArrayList<Integer>();
+ keyIndicators.add(0);
+ List<String> fieldNames = new ArrayList<String>();
+ fieldNames.add(BADConstants.SubscriptionId);
+ partitionFields.add(fieldNames);
+ IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
+ DatasetDecl createSubscriptionsDataset = new DatasetDecl(dataverseName, subscriptionsName,
+ new Identifier("Metadata"), subscriptionsTypeName, null, null, null, null,
+ new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
+
+ //Setup the results dataset
+ partitionFields = new ArrayList<List<String>>();
+ fieldNames = new ArrayList<String>();
+ fieldNames.add(BADConstants.ResultId);
+ partitionFields.add(fieldNames);
+ idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
+ DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, resultsName, new Identifier("Metadata"),
+ resultsTypeName, null, null, null, null, new HashMap<String, String>(), new HashMap<String, String>(),
+ DatasetType.INTERNAL, idd, true);
+
+ //Run both statements to create datasets
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
+ hcc);
+ ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
+
+ }
+
+ private JobSpecification createChannelJob(IStatementExecutor statementExecutor, Identifier subscriptionsName,
+ Identifier resultsName, AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+ IHyracksDataset hdc, Stats stats) throws Exception {
+ StringBuilder builder = new StringBuilder();
+ builder.append("insert into dataset " + dataverseName + "." + resultsName + " ");
+ builder.append(" (" + " let $" + BADConstants.ChannelExecutionTime + " := current-datetime() \n");
+
+ builder.append("for $sub in dataset " + dataverseName + "." + subscriptionsName + "\n");
+ builder.append("for $broker in dataset Metadata.Broker\n");
+ builder.append("where $broker." + BADConstants.BrokerName + "= $sub." + BADConstants.BrokerName + "\n");
+ builder.append("and $broker." + BADConstants.BrokerDataverse + "= $sub." + BADConstants.BrokerDataverse + "\n");
+ builder.append(" for $result in " + function.getNamespace() + "." + function.getName() + "(");
+ int i = 0;
+ for (; i < function.getArity() - 1; i++) {
+ builder.append("$sub.param" + i + ",");
+ }
+ builder.append("$sub.param" + i + ")\n");
+ builder.append("return {\n");
+ builder.append("\"" + BADConstants.ChannelExecutionTime + "\":$" + BADConstants.ChannelExecutionTime + ",");
+ builder.append("\"" + BADConstants.SubscriptionId + "\":$sub." + BADConstants.SubscriptionId + ",");
+ builder.append("\"" + BADConstants.DeliveryTime + "\":current-datetime(),");
+ builder.append("\"result\":$result");
+ builder.append("}");
+ builder.append(")");
+ builder.append(" return records");
+ builder.append(";");
+ AQLParserFactory aqlFact = new AQLParserFactory();
+ List<Statement> fStatements = aqlFact.createParser(new StringReader(builder.toString())).parse();
+ return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(0),
+ hcc, hdc, ResultDelivery.ASYNC, stats, true);
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
+ throws HyracksDataException, AlgebricksException {
+
+ //This function performs three tasks:
+ //1. Create datasets for the Channel
+ //2. Create the compiled Channel Job
+ //3. Create the metadata entry for the channel
+
+ //TODO: Figure out how to handle when a subset of the 3 tasks fails
+ //TODO: The compiled job will break if anything changes to the function or two datasets
+ // Need to make sure we do proper checking when altering these things
+
+ Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
+ Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
+ EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName.getValue(),
+ channelName.getValue());
+ ChannelEventsListener listener = (ChannelEventsListener) ActiveJobNotificationHandler.INSTANCE
+ .getActiveEntityListener(entityId);
+ IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
+ boolean subscriberRegistered = false;
+ Channel channel = null;
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ channel = BADLangExtension.getChannel(mdTxnCtx, dataverseName.getValue(), channelName.getValue());
+ if (channel != null) {
+ throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
+ }
+ if (listener != null) {
+ subscriberRegistered = listener.isChannelActive(entityId, eventSubscriber);
+ }
+ if (subscriberRegistered) {
+ throw new AsterixException("Channel " + channelName + " is already running");
+ }
+ initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
+ channel = new Channel(dataverseName.getValue(), channelName.getValue(), subscriptionsTableName,
+ resultsTableName, function, duration);
+
+ //check if names are available before creating anything
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName.getValue(),
+ subscriptionsName.getValue()) != null) {
+ throw new AsterixException("The channel name:" + channelName + " is not available.");
+ }
+ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName.getValue(),
+ resultsName.getValue()) != null) {
+ throw new AsterixException("The channel name:" + channelName + " is not available.");
+ }
+
+ // Now we subscribe
+ if (listener == null) {
+ listener = new ChannelEventsListener(entityId);
+ ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
+ }
+ listener.registerEventSubscriber(eventSubscriber);
+ subscriberRegistered = true;
+
+ //Create Channel Datasets
+ createDatasets(statementExecutor, subscriptionsName, resultsName, metadataProvider, hcc, hdc, stats);
+
+ //Create Channel Internal Job
+ JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
+ metadataProvider, hcc, hdc, stats);
+
+ //Create Channel Operator
+ ICCApplicationContext iCCApp = AsterixAppContextInfo.INSTANCE.getCCApplicationContext();
+ ClusterControllerInfo ccInfo = iCCApp.getCCContext().getClusterControllerInfo();
+ String strIP = ccInfo.getClientNetAddress();
+ int port = ccInfo.getClientNetPort();
+ Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> alteredJobSpec = buildChannelJobSpec(
+ dataverseName.getValue(), channelName.getValue(), duration, metadataProvider, channeljobSpec, strIP,
+ port);
+
+ channel.setPartitionConstraint(alteredJobSpec.second);
+
+ ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null, ActivityState.ACTIVE,
+ alteredJobSpec.first);
+ alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, channelJobInfo);
+ JobUtils.runJob(hcc, alteredJobSpec.first, false);
+
+ eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+ MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ if (mdTxnCtx != null) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ LOGGER.log(Level.WARNING, "Failed creating a channel", e);
+ throw new HyracksDataException(e);
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
new file mode 100644
index 0000000..9129b0f
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.rmi.RemoteException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.api.ExtensionId;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
+import org.apache.asterix.metadata.api.IMetadataEntity;
+import org.apache.asterix.metadata.api.IMetadataExtension;
+import org.apache.asterix.metadata.api.IMetadataIndex;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
+import org.apache.asterix.metadata.entities.Datatype;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class BADMetadataExtension implements IMetadataExtension {
+
+ public static final ExtensionId BAD_METADATA_EXTENSION_ID = new ExtensionId(
+ BADConstants.BAD_METADATA_EXTENSION_NAME, 0);
+ public static final Dataverse BAD_DATAVERSE = new Dataverse(BADConstants.BAD_DATAVERSE_NAME,
+ NonTaggedDataFormat.class.getName(), IMetadataEntity.PENDING_NO_OP);
+
+ public static final Datatype BAD_SUBSCRIPTION_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.ChannelSubscriptionsType, BADMetadataRecordTypes.channelSubscriptionsType, false);
+ public static final Datatype BAD_RESULT_DATATYPE = new Datatype(BADConstants.BAD_DATAVERSE_NAME,
+ BADConstants.ChannelResultsType, BADMetadataRecordTypes.channelResultsType, false);
+
+ @Override
+ public ExtensionId getId() {
+ return BAD_METADATA_EXTENSION_ID;
+ }
+
+ @Override
+ public void configure(List<Pair<String, String>> args) {
+ // do nothing??
+ }
+
+ @Override
+ public MetadataTupleTranslatorProvider getMetadataTupleTranslatorProvider() {
+ return new MetadataTupleTranslatorProvider();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public List<ExtensionMetadataDataset> getExtensionIndexes() {
+ try {
+ ExtensionMetadataDataset A = BADMetadataIndexes.CHANNEL_DATASET;
+ ExtensionMetadataDataset B = BADMetadataIndexes.BROKER_DATASET;
+ return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ }
+
+ @Override
+ public void initializeMetadata() throws HyracksDataException, RemoteException, ACIDException {
+ // enlist datasets
+ MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.CHANNEL_DATASET);
+ MetadataBootstrap.enlistMetadataDataset(BADMetadataIndexes.BROKER_DATASET);
+ if (MetadataBootstrap.isNewUniverse()) {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ // add metadata datasets
+ MetadataBootstrap.insertMetadataDatasets(mdTxnCtx,
+ new IMetadataIndex[] { BADMetadataIndexes.CHANNEL_DATASET, BADMetadataIndexes.BROKER_DATASET });
+ // insert default dataverse
+ // TODO prevent user from dropping this dataverse
+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, BAD_DATAVERSE);
+ // insert default data type
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_RESULT_DATATYPE);
+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, BAD_SUBSCRIPTION_DATATYPE);
+ // TODO prevent user from dropping these types
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ e.printStackTrace();
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
+ }
+ // local recovery?
+ // nothing for now
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
new file mode 100644
index 0000000..188f04f
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataIndexes.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.Arrays;
+
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class BADMetadataIndexes {
+
+ public static final String INDEX_NAME_CHANNEL = "Channel";
+ public static final String INDEX_NAME_BROKER = "Broker";
+
+ public static final ExtensionMetadataDatasetId BAD_CHANNEL_INDEX_ID = new ExtensionMetadataDatasetId(
+ BADMetadataExtension.BAD_METADATA_EXTENSION_ID, INDEX_NAME_CHANNEL);
+ public static final MetadataIndexImmutableProperties PROPERTIES_CHANNEL = new MetadataIndexImmutableProperties(
+ INDEX_NAME_CHANNEL, MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID);
+
+ public static final ExtensionMetadataDatasetId BAD_BROKER_INDEX_ID = new ExtensionMetadataDatasetId(
+ BADMetadataExtension.BAD_METADATA_EXTENSION_ID, INDEX_NAME_BROKER);
+ public static final MetadataIndexImmutableProperties PROPERTIES_BROKER = new MetadataIndexImmutableProperties(
+ INDEX_NAME_BROKER, MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID,
+ MetadataIndexImmutableProperties.FIRST_AVAILABLE_EXTENSION_METADATA_DATASET_ID);
+
+ public static final int NUM_FIELDS_CHANNEL_IDX = 3;
+ public static final int NUM_FIELDS_BROKER_IDX = 3;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static final ExtensionMetadataDataset CHANNEL_DATASET = new ExtensionMetadataDataset(PROPERTIES_CHANNEL,
+ NUM_FIELDS_CHANNEL_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+ Arrays.asList(BADMetadataRecordTypes.FIELD_NAME_CHANNEL_NAME)),
+ 0, BADMetadataRecordTypes.CHANNEL_RECORDTYPE, true, new int[] { 0, 1 }, BAD_CHANNEL_INDEX_ID,
+ new ChannelTupleTranslator(true));
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static final ExtensionMetadataDataset BROKER_DATASET = new ExtensionMetadataDataset(PROPERTIES_BROKER,
+ NUM_FIELDS_BROKER_IDX, new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING },
+ Arrays.asList(Arrays.asList(MetadataRecordTypes.FIELD_NAME_DATAVERSE_NAME),
+ Arrays.asList(BADMetadataRecordTypes.FIELD_NAME_BROKER_NAME)),
+ 0, BADMetadataRecordTypes.BROKER_RECORDTYPE, true, new int[] { 0, 1 }, BAD_BROKER_INDEX_ID,
+ new BrokerTupleTranslator(true));
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
new file mode 100644
index 0000000..d95570f
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+
+public class BADMetadataRecordTypes {
+
+ // --------------------------------------- Fields Names --------------------------------------//
+ public static final String FIELD_NAME_BROKER_NAME = "BrokerName";
+ public static final String FIELD_NAME_BROKER_ENDPOINT = "BrokerEndPoint";
+ public static final String FIELD_NAME_CHANNEL_DURATION = "Duration";
+ public static final String FIELD_NAME_CHANNEL_FUNCTION = "Function";
+ public static final String FIELD_NAME_CHANNEL_NAME = "ChannelName";
+ public static final String FIELD_NAME_CHANNEL_RESULTS_DATASET = "ResultsDatasetName";
+ public static final String FIELD_NAME_CHANNEL_SUBSCRIPTIONS_DATASET = "SubscriptionsDatasetName";
+ public static final String FIELD_NAME_DATAVERSE_NAME = "DataverseName";
+
+ // -------------------------------------- Subscriptions --------------------------------------//
+ private static final String[] subTypeFieldNames = { BADConstants.BrokerDataverse, BADConstants.BrokerName,
+ BADConstants.SubscriptionId };
+ private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID };
+ public static final ARecordType channelSubscriptionsType = new ARecordType(BADConstants.ChannelSubscriptionsType,
+ subTypeFieldNames, subTypeFieldTypes, true);
+
+ // ---------------------------------------- Results --------------------------------------------//
+ private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime,
+ BADConstants.SubscriptionId, BADConstants.DeliveryTime };
+ private static final IAType[] resultTypeFieldTypes = { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID,
+ BuiltinType.ADATETIME };
+ public static final ARecordType channelResultsType = new ARecordType(BADConstants.ChannelResultsType,
+ resultTypeFieldNames, resultTypeFieldTypes, true);
+
+ //------------------------------------------ Channel ----------------------------------------//
+ public static final String RECORD_NAME_CHANNEL = "ChannelRecordType";
+ public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
+ public static final int CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX = 1;
+ public static final int CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX = 2;
+ public static final int CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX = 3;
+ public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
+ public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
+ public static final ARecordType CHANNEL_RECORDTYPE = MetadataRecordTypes.createRecordType(
+ // RecordTypeName
+ RECORD_NAME_CHANNEL,
+ // FieldNames
+ new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_CHANNEL_NAME, FIELD_NAME_CHANNEL_SUBSCRIPTIONS_DATASET,
+ FIELD_NAME_CHANNEL_RESULTS_DATASET, FIELD_NAME_CHANNEL_FUNCTION, FIELD_NAME_CHANNEL_DURATION },
+ // FieldTypes
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING, BuiltinType.ASTRING },
+ //IsOpen?
+ true);
+ //------------------------------------------ Broker ----------------------------------------//
+ public static final String RECORD_NAME_BROKER = "BrokerRecordType";
+ public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
+ public static final int BROKER_NAME_FIELD_INDEX = 1;
+ public static final int BROKER_ENDPOINT_FIELD_INDEX = 2;
+ public static final ARecordType BROKER_RECORDTYPE = MetadataRecordTypes.createRecordType(
+ // RecordTypeName
+ RECORD_NAME_BROKER,
+ // FieldNames
+ new String[] { FIELD_NAME_DATAVERSE_NAME, FIELD_NAME_BROKER_NAME, FIELD_NAME_BROKER_ENDPOINT },
+ // FieldTypes
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING, BuiltinType.ASTRING },
+ //IsOpen?
+ true);
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/Broker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/Broker.java b/src/main/java/org/apache/asterix/bad/metadata/Broker.java
new file mode 100644
index 0000000..006f0dc
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/Broker.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2015 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+
+/**
+ * Metadata describing a broker.
+ */
+public class Broker implements IExtensionMetadataEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String dataverseName;
+ private final String brokerName;
+ private final String endPointName;
+
+ public Broker(String dataverseName, String brokerName, String endPointName) {
+ this.endPointName = endPointName;
+ this.dataverseName = dataverseName;
+ this.brokerName = brokerName;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public String getEndPointName() {
+ return endPointName;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof Broker)) {
+ return false;
+ }
+ Broker otherDataset = (Broker) other;
+ if (!otherDataset.brokerName.equals(brokerName)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java b/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
new file mode 100644
index 0000000..b73e9e3
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BrokerSearchKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class BrokerSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+ private final String broker;
+
+ public BrokerSearchKey(String dataverse, String broker) {
+ this.dataverse = dataverse;
+ this.broker = broker;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_BROKER_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse, broker);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java b/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
new file mode 100644
index 0000000..2b478f2
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/BrokerTupleTranslator.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
+import org.apache.asterix.om.base.ARecord;
+import org.apache.asterix.om.base.AString;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Channel metadata entity to an ITupleReference and vice versa.
+ */
+public class BrokerTupleTranslator extends AbstractTupleTranslator<Broker> {
+ // Field indexes of serialized Broker in a tuple.
+ // Key field.
+ public static final int BROKER_DATAVERSE_NAME_FIELD_INDEX = 0;
+
+ public static final int BROKER_NAME_FIELD_INDEX = 1;
+
+ // Payload field containing serialized broker.
+ public static final int BROKER_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BADMetadataRecordTypes.BROKER_RECORDTYPE);
+
+ @SuppressWarnings("unchecked")
+ public BrokerTupleTranslator(boolean getTuple) {
+ super(getTuple, BADMetadataIndexes.BROKER_DATASET.getFieldCount());
+ }
+
+ @Override
+ public Broker getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
+ byte[] serRecord = frameTuple.getFieldData(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordStartOffset = frameTuple.getFieldStart(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordLength = frameTuple.getFieldLength(BROKER_PAYLOAD_TUPLE_FIELD_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+ DataInput in = new DataInputStream(stream);
+ ARecord channelRecord = recordSerDes.deserialize(in);
+ return createBrokerFromARecord(channelRecord);
+ }
+
+ private Broker createBrokerFromARecord(ARecord brokerRecord) {
+ Broker broker = null;
+ String dataverseName = ((AString) brokerRecord
+ .getValueByPos(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+ String brokerName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX))
+ .getStringValue();
+ String endPointName = ((AString) brokerRecord.getValueByPos(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX))
+ .getStringValue();
+
+ broker = new Broker(dataverseName, brokerName, endPointName);
+ return broker;
+ }
+
+ @Override
+ public ITupleReference getTupleFromMetadataEntity(Broker broker) throws IOException, MetadataException {
+ // write the key in the first fields of the tuple
+
+ tupleBuilder.reset();
+ aString.setValue(broker.getDataverseName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ aString.setValue(broker.getBrokerName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ recordBuilder.reset(BADMetadataRecordTypes.BROKER_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(broker.getDataverseName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.BROKER_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(broker.getBrokerName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.BROKER_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 2
+ fieldValue.reset();
+ aString.setValue(broker.getEndPointName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(BADMetadataRecordTypes.BROKER_ENDPOINT_FIELD_INDEX, fieldValue);
+
+ // write record
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+
+ tupleBuilder.addFieldEndOffset();
+
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/Channel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/src/main/java/org/apache/asterix/bad/metadata/Channel.java
new file mode 100644
index 0000000..1025748
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2015 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+
+/**
+ * Metadata describing a channel.
+ */
+public class Channel implements IExtensionMetadataEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ /** A unique identifier for the channel */
+ protected final EntityId channelId;
+ private final String subscriptionsDatasetName;
+ private final String resultsDatasetName;
+ private final String duration;
+ private final FunctionSignature function;
+ private AlgebricksAbsolutePartitionConstraint partitionConstraint;
+
+ public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
+ FunctionSignature function, String duration) {
+ this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
+ this.function = function;
+ this.duration = duration;
+ this.resultsDatasetName = resultsDataset;
+ this.subscriptionsDatasetName = subscriptionsDataset;
+ }
+
+ public EntityId getChannelId() {
+ return channelId;
+ }
+
+ public String getSubscriptionsDataset() {
+ return subscriptionsDatasetName;
+ }
+
+ public String getResultsDatasetName() {
+ return resultsDatasetName;
+ }
+
+ public String getDuration() {
+ return duration;
+ }
+
+ public FunctionSignature getFunction() {
+ return function;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof Channel)) {
+ return false;
+ }
+ Channel otherDataset = (Channel) other;
+ if (!otherDataset.channelId.equals(channelId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+ }
+
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return partitionConstraint;
+ }
+
+ public void setPartitionConstraint(AlgebricksAbsolutePartitionConstraint partitionConstraint) {
+ this.partitionConstraint = partitionConstraint;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java b/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
new file mode 100644
index 0000000..b48478d
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveJob;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobInfo;
+import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.log4j.Logger;
+
+public class ChannelEventsListener implements IActiveEntityEventsListener {
+ private static final Logger LOGGER = Logger.getLogger(ChannelEventsListener.class);
+ private final List<IActiveLifecycleEventSubscriber> subscribers;
+ private final Map<Long, ActiveJob> jobs;
+ private final Map<EntityId, ChannelJobInfo> jobInfos;
+ private EntityId entityId;
+
+ public ChannelEventsListener(EntityId entityId) {
+ this.entityId = entityId;
+ subscribers = new ArrayList<>();
+ jobs = new HashMap<>();
+ jobInfos = new HashMap<>();
+ }
+
+ @Override
+ public void notify(ActiveEvent event) {
+ try {
+ switch (event.getEventKind()) {
+ case JOB_START:
+ handleJobStartEvent(event);
+ break;
+ case JOB_FINISH:
+ handleJobFinishEvent(event);
+ break;
+ default:
+ LOGGER.warn("Unknown Channel Event" + event);
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Unhandled Exception", e);
+ }
+ }
+
+ private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
+ ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+ handleJobStartMessage((ChannelJobInfo) jobInfo);
+ }
+
+ private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
+ ActiveJob jobInfo = jobs.get(message.getJobId().getId());
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Channel Job finished for " + jobInfo);
+ }
+ handleJobFinishMessage((ChannelJobInfo) jobInfo);
+ }
+
+ private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo) throws Exception {
+ EntityId channelJobId = cInfo.getEntityId();
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ JobStatus status = info.getStatus();
+ boolean failure = status != null && status.equals(JobStatus.FAILURE);
+
+ jobInfos.remove(channelJobId);
+ jobs.remove(cInfo.getJobId().getId());
+ // notify event listeners
+ ActiveLifecycleEvent event = failure ? ActiveLifecycleEvent.ACTIVE_JOB_FAILED
+ : ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
+ notifyEventSubscribers(event);
+ }
+
+ private void notifyEventSubscribers(ActiveLifecycleEvent event) {
+ if (subscribers != null && !subscribers.isEmpty()) {
+ for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
+ subscriber.handleEvent(event);
+ }
+ }
+ }
+
+ private static synchronized void handleJobStartMessage(ChannelJobInfo cInfo) throws Exception {
+ List<OperatorDescriptorId> channelOperatorIds = new ArrayList<>();
+ Map<OperatorDescriptorId, IOperatorDescriptor> operators = cInfo.getSpec().getOperatorMap();
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+ IOperatorDescriptor opDesc = entry.getValue();
+ if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
+ channelOperatorIds.add(opDesc.getOperatorId());
+ }
+ }
+
+ IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ List<String> locations = new ArrayList<>();
+ for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(channelOperatorId);
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ locations.add(operatorLocations.get(i));
+ }
+ }
+ // intakeLocations is an ordered list;
+ // element at position i corresponds to location of i'th instance of operator
+ cInfo.setLocations(locations);
+ cInfo.setState(ActivityState.ACTIVE);
+ }
+
+ @Override
+ public void notifyJobCreation(JobId jobId, JobSpecification spec) {
+ EntityId channelId = null;
+ try {
+ for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
+ if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
+ channelId = ((RepetitiveChannelOperatorDescriptor) opDesc).getEntityId();
+ registerJob(channelId, jobId, spec);
+ return;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e);
+ }
+ }
+
+ public synchronized void registerJob(EntityId entityId, JobId jobId, JobSpecification jobSpec) {
+ if (jobs.get(jobId.getId()) != null) {
+ throw new IllegalStateException("Channel job already registered");
+ }
+ if (jobInfos.containsKey(jobId.getId())) {
+ throw new IllegalStateException("Channel job already registered");
+ }
+
+ ChannelJobInfo cInfo = new ChannelJobInfo(entityId, jobId, ActivityState.CREATED, jobSpec);
+ jobs.put(jobId.getId(), cInfo);
+ jobInfos.put(entityId, cInfo);
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Registered channel job [" + jobId + "]" + " for channel " + entityId);
+ }
+
+ notifyEventSubscribers(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
+
+ }
+
+ public JobSpecification getJobSpecification(EntityId activeJobId) {
+ return jobInfos.get(activeJobId).getSpec();
+ }
+
+ public ChannelJobInfo getJobInfo(EntityId activeJobId) {
+ return jobInfos.get(activeJobId);
+ }
+
+ public synchronized void registerEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+ subscribers.add(subscriber);
+ }
+
+ public void deregisterEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
+ subscribers.remove(subscriber);
+ }
+
+ public synchronized boolean isChannelActive(EntityId activeJobId, IActiveLifecycleEventSubscriber eventSubscriber) {
+ boolean active = false;
+ ChannelJobInfo cInfo = jobInfos.get(activeJobId);
+ if (cInfo != null) {
+ active = cInfo.getState().equals(ActivityState.ACTIVE);
+ }
+ if (active) {
+ registerEventSubscriber(eventSubscriber);
+ }
+ return active;
+ }
+
+ public ChannelJobInfo getFeedConnectJobInfo(EntityId activeJobId) {
+ return jobInfos.get(activeJobId);
+ }
+
+ public FeedConnectionId[] getConnections() {
+ return jobInfos.keySet().toArray(new FeedConnectionId[jobInfos.size()]);
+ }
+
+ @Override
+ public boolean isEntityActive() {
+ return !jobs.isEmpty();
+ }
+
+ @Override
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ @Override
+ public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
+ if (entityId.getDataverse().equals(dataverseName)) {
+ String subscriptionsName = entityId.getEntityName() + BADConstants.subscriptionEnding;
+ String resultsName = entityId.getEntityName() + BADConstants.resultsEnding;
+ if (datasetName.equals(subscriptionsName) || datasetName.equals(resultsName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/bdba1b86/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java b/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
new file mode 100644
index 0000000..679548c
--- /dev/null
+++ b/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ChannelSearchKey implements IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+ private final String channel;
+
+ public ChannelSearchKey(String dataverse, String channel) {
+ this.dataverse = dataverse;
+ this.channel = channel;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse, channel);
+ }
+}