You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2016/12/07 20:59:40 UTC
[2/7] asterixdb-bad git commit: Updated to match code changes to
asterix
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/Channel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/Channel.java b/src/main/java/org/apache/asterix/bad/metadata/Channel.java
deleted file mode 100644
index b201af6..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2009-2015 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.asterix.bad.metadata;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
-import org.apache.asterix.metadata.api.IExtensionMetadataEntity;
-
-/**
- * Metadata describing a channel.
- */
-public class Channel implements IExtensionMetadataEntity {
-
- private static final long serialVersionUID = 1L;
-
- /** A unique identifier for the channel */
- protected final EntityId channelId;
- private final String subscriptionsDatasetName;
- private final String resultsDatasetName;
- private final String duration;
- private final FunctionSignature function;
-
- public Channel(String dataverseName, String channelName, String subscriptionsDataset, String resultsDataset,
- FunctionSignature function, String duration) {
- this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
- this.function = function;
- this.duration = duration;
- this.resultsDatasetName = resultsDataset;
- this.subscriptionsDatasetName = subscriptionsDataset;
- }
-
- public EntityId getChannelId() {
- return channelId;
- }
-
- public String getSubscriptionsDataset() {
- return subscriptionsDatasetName;
- }
-
- public String getResultsDatasetName() {
- return resultsDatasetName;
- }
-
- public String getDuration() {
- return duration;
- }
-
- public FunctionSignature getFunction() {
- return function;
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (!(other instanceof Channel)) {
- return false;
- }
- Channel otherDataset = (Channel) other;
- if (!otherDataset.channelId.equals(channelId)) {
- return false;
- }
- return true;
- }
-
- @Override
- public ExtensionMetadataDatasetId getDatasetId() {
- return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java b/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
deleted file mode 100644
index 82c97c8..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.metadata;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActiveJob;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.runtime.util.AsterixAppContextInfo;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.job.JobStatus;
-import org.apache.log4j.Logger;
-
-public class ChannelEventsListener implements IActiveEntityEventsListener {
- private static final Logger LOGGER = Logger.getLogger(ChannelEventsListener.class);
- private final List<IActiveLifecycleEventSubscriber> subscribers;
- private final Map<Long, ActiveJob> jobs;
- private final Map<EntityId, ChannelJobInfo> jobInfos;
- private EntityId entityId;
-
- public ChannelEventsListener(EntityId entityId) {
- this.entityId = entityId;
- subscribers = new ArrayList<>();
- jobs = new HashMap<>();
- jobInfos = new HashMap<>();
- }
-
- @Override
- public void notify(ActiveEvent event) {
- try {
- switch (event.getEventKind()) {
- case JOB_START:
- handleJobStartEvent(event);
- break;
- case JOB_FINISH:
- handleJobFinishEvent(event);
- break;
- default:
- LOGGER.warn("Unknown Channel Event" + event);
- break;
- }
- } catch (Exception e) {
- LOGGER.error("Unhandled Exception", e);
- }
- }
-
- private synchronized void handleJobStartEvent(ActiveEvent message) throws Exception {
- ActiveJob jobInfo = jobs.get(message.getJobId().getId());
- handleJobStartMessage((ChannelJobInfo) jobInfo);
- }
-
- private synchronized void handleJobFinishEvent(ActiveEvent message) throws Exception {
- ActiveJob jobInfo = jobs.get(message.getJobId().getId());
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Channel Job finished for " + jobInfo);
- }
- handleJobFinishMessage((ChannelJobInfo) jobInfo);
- }
-
- private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo) throws Exception {
- EntityId channelJobId = cInfo.getEntityId();
-
- IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
- JobInfo info = hcc.getJobInfo(cInfo.getJobId());
- JobStatus status = info.getStatus();
- boolean failure = status != null && status.equals(JobStatus.FAILURE);
-
- jobInfos.remove(channelJobId);
- jobs.remove(cInfo.getJobId().getId());
- // notify event listeners
- ActiveLifecycleEvent event = failure ? ActiveLifecycleEvent.ACTIVE_JOB_FAILED
- : ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
- notifyEventSubscribers(event);
- }
-
- private void notifyEventSubscribers(ActiveLifecycleEvent event) {
- if (subscribers != null && !subscribers.isEmpty()) {
- for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
- subscriber.handleEvent(event);
- }
- }
- }
-
- private static synchronized void handleJobStartMessage(ChannelJobInfo cInfo) throws Exception {
- List<OperatorDescriptorId> channelOperatorIds = new ArrayList<>();
- Map<OperatorDescriptorId, IOperatorDescriptor> operators = cInfo.getSpec().getOperatorMap();
- for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
- IOperatorDescriptor opDesc = entry.getValue();
- if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
- channelOperatorIds.add(opDesc.getOperatorId());
- }
- }
-
- IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
- JobInfo info = hcc.getJobInfo(cInfo.getJobId());
- List<String> locations = new ArrayList<>();
- for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
- Map<Integer, String> operatorLocations = info.getOperatorLocations().get(channelOperatorId);
- int nOperatorInstances = operatorLocations.size();
- for (int i = 0; i < nOperatorInstances; i++) {
- locations.add(operatorLocations.get(i));
- }
- }
- cInfo.setLocations(locations);
- cInfo.setState(ActivityState.ACTIVE);
- }
-
- @Override
- public void notifyJobCreation(JobId jobId, JobSpecification spec) {
- EntityId channelId = null;
- try {
- for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
- if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
- channelId = ((RepetitiveChannelOperatorDescriptor) opDesc).getEntityId();
- registerJob(channelId, jobId, spec);
- return;
- }
- }
- } catch (Exception e) {
- LOGGER.error(e);
- }
- }
-
- public synchronized void registerJob(EntityId entityId, JobId jobId, JobSpecification jobSpec) {
- if (jobs.get(jobId.getId()) != null) {
- throw new IllegalStateException("Channel job already registered");
- }
- if (jobInfos.containsKey(jobId.getId())) {
- throw new IllegalStateException("Channel job already registered");
- }
-
- ChannelJobInfo cInfo = new ChannelJobInfo(entityId, jobId, ActivityState.CREATED, jobSpec);
- jobs.put(jobId.getId(), cInfo);
- jobInfos.put(entityId, cInfo);
-
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Registered channel job [" + jobId + "]" + " for channel " + entityId);
- }
-
- notifyEventSubscribers(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
-
- }
-
- public JobSpecification getJobSpecification(EntityId activeJobId) {
- return jobInfos.get(activeJobId).getSpec();
- }
-
- public ChannelJobInfo getJobInfo(EntityId activeJobId) {
- return jobInfos.get(activeJobId);
- }
-
- public synchronized void registerEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
- subscribers.add(subscriber);
- }
-
- public void deregisterEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
- subscribers.remove(subscriber);
- }
-
- public synchronized boolean isChannelActive(EntityId activeJobId, IActiveLifecycleEventSubscriber eventSubscriber) {
- boolean active = false;
- ChannelJobInfo cInfo = jobInfos.get(activeJobId);
- if (cInfo != null) {
- active = cInfo.getState().equals(ActivityState.ACTIVE);
- }
- if (active) {
- registerEventSubscriber(eventSubscriber);
- }
- return active;
- }
-
- public FeedConnectionId[] getConnections() {
- return jobInfos.keySet().toArray(new FeedConnectionId[jobInfos.size()]);
- }
-
- @Override
- public boolean isEntityActive() {
- return !jobs.isEmpty();
- }
-
- @Override
- public EntityId getEntityId() {
- return entityId;
- }
-
- @Override
- public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
- if (entityId.getDataverse().equals(dataverseName)) {
- String subscriptionsName = entityId.getEntityName() + BADConstants.subscriptionEnding;
- String resultsName = entityId.getEntityName() + BADConstants.resultsEnding;
- if (datasetName.equals(subscriptionsName) || datasetName.equals(resultsName)) {
- return true;
- }
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java b/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
deleted file mode 100644
index 679548c..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/ChannelSearchKey.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.metadata;
-
-import org.apache.asterix.metadata.MetadataNode;
-import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
-import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public class ChannelSearchKey implements IExtensionMetadataSearchKey {
- private static final long serialVersionUID = 1L;
- private final String dataverse;
- private final String channel;
-
- public ChannelSearchKey(String dataverse, String channel) {
- this.dataverse = dataverse;
- this.channel = channel;
- }
-
- @Override
- public ExtensionMetadataDatasetId getDatasetId() {
- return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID;
- }
-
- @Override
- public ITupleReference getSearchKey() {
- return MetadataNode.createTuple(dataverse, channel);
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java b/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
deleted file mode 100644
index 18b2067..0000000
--- a/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.asterix.bad.metadata;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator;
-import org.apache.asterix.om.base.ARecord;
-import org.apache.asterix.om.base.AString;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-/**
- * Translates a Channel metadata entity to an ITupleReference and vice versa.
- */
-public class ChannelTupleTranslator extends AbstractTupleTranslator<Channel> {
- // Field indexes of serialized Feed in a tuple.
- // Key field.
- public static final int CHANNEL_DATAVERSE_NAME_FIELD_INDEX = 0;
-
- public static final int CHANNEL_NAME_FIELD_INDEX = 1;
-
- // Payload field containing serialized feed.
- public static final int CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX = 2;
-
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
-
- @SuppressWarnings("unchecked")
- public ChannelTupleTranslator(boolean getTuple) {
- super(getTuple, BADMetadataIndexes.NUM_FIELDS_CHANNEL_IDX);
- }
-
- @Override
- public Channel getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException {
- byte[] serRecord = frameTuple.getFieldData(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordStartOffset = frameTuple.getFieldStart(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
- int recordLength = frameTuple.getFieldLength(CHANNEL_PAYLOAD_TUPLE_FIELD_INDEX);
- ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
- DataInput in = new DataInputStream(stream);
- ARecord channelRecord = recordSerDes.deserialize(in);
- return createChannelFromARecord(channelRecord);
- }
-
- private Channel createChannelFromARecord(ARecord channelRecord) {
- Channel channel = null;
- String dataverseName = ((AString) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
- String channelName = ((AString) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX)).getStringValue();
- String subscriptionsName = ((AString) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX)).getStringValue();
- String resultsName = ((AString) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX)).getStringValue();
- String fName = ((AString) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX)).getStringValue();
- String duration = ((AString) channelRecord
- .getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX)).getStringValue();
-
- FunctionSignature signature = null;
-
- String[] qnameComponents = fName.split("\\.");
- String functionDataverse;
- String functionName;
- if (qnameComponents.length == 2) {
- functionDataverse = qnameComponents[0];
- functionName = qnameComponents[1];
- } else {
- functionDataverse = dataverseName;
- functionName = qnameComponents[0];
- }
-
- String[] nameComponents = functionName.split("@");
- signature = new FunctionSignature(functionDataverse, nameComponents[0], Integer.parseInt(nameComponents[1]));
-
- channel = new Channel(dataverseName, channelName, subscriptionsName, resultsName, signature, duration);
- return channel;
- }
-
- @Override
- public ITupleReference getTupleFromMetadataEntity(Channel channel) throws IOException, MetadataException {
- // write the key in the first fields of the tuple
-
- tupleBuilder.reset();
- aString.setValue(channel.getChannelId().getDataverse());
- stringSerde.serialize(aString, tupleBuilder.getDataOutput());
- tupleBuilder.addFieldEndOffset();
-
- aString.setValue(channel.getChannelId().getEntityName());
- stringSerde.serialize(aString, tupleBuilder.getDataOutput());
- tupleBuilder.addFieldEndOffset();
-
- recordBuilder.reset(BADMetadataRecordTypes.CHANNEL_RECORDTYPE);
-
- // write field 0
- fieldValue.reset();
- aString.setValue(channel.getChannelId().getDataverse());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
-
- // write field 1
- fieldValue.reset();
- aString.setValue(channel.getChannelId().getEntityName());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_CHANNEL_NAME_FIELD_INDEX, fieldValue);
-
- // write field 2
- fieldValue.reset();
- aString.setValue(channel.getSubscriptionsDataset());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_SUBSCRIPTIONS_NAME_FIELD_INDEX, fieldValue);
-
- // write field 3
- fieldValue.reset();
- aString.setValue(channel.getResultsDatasetName());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_RESULTS_NAME_FIELD_INDEX, fieldValue);
-
- // write field 4
- fieldValue.reset();
- aString.setValue(channel.getFunction().toString());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_FUNCTION_FIELD_INDEX, fieldValue);
-
- // write field 5
- fieldValue.reset();
- aString.setValue(channel.getDuration());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DURATION_FIELD_INDEX, fieldValue);
-
- // write record
- recordBuilder.write(tupleBuilder.getDataOutput(), true);
-
- tupleBuilder.addFieldEndOffset();
-
- tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
- return tuple;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
deleted file mode 100644
index 8e19fc0..0000000
--- a/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.rules;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.algebra.operators.CommitOperator;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.runtime.NotifyBrokerOperator;
-import org.apache.asterix.bad.runtime.NotifyBrokerPOperator;
-import org.apache.asterix.lang.common.util.FunctionUtil;
-import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.constants.AsterixConstantValue;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule {
-
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- return false;
- }
-
- @Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
- if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
- return false;
- }
- AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
- return false;
- }
- ExtensionOperator eOp = (ExtensionOperator) op;
- if (!(eOp.getDelegate() instanceof CommitOperator)) {
- return false;
- }
- AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
- if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
- return false;
- }
- InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp;
- if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) {
- return false;
- }
- DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource();
- String datasetName = dds.getDataset().getDatasetName();
- if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata")
- || !dds.getDataset().getItemTypeName().equals("ChannelResultsType")
- || !datasetName.endsWith("Results")) {
- return false;
- }
- String channelDataverse = dds.getDataset().getDataverseName();
- //Now we know that we are inserting into results
-
- String channelName = datasetName.substring(0, datasetName.length() - 7);
- String subscriptionsName = channelName + "Subscriptions";
- //TODO: Can we check here to see if there is a channel with such a name?
-
- DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName);
- if (subscriptionsScan == null) {
- return false;
- }
-
- //Now we want to make sure and set the commit to be a nonsink commit
- ((CommitOperator) eOp.getDelegate()).setSink(false);
-
- //Now we need to get the broker EndPoint
- LogicalVariable brokerEndpointVar = context.newVar();
- AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers");
- AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan);
- //now brokerNameVar holds the brokerName for use farther up in the plan
-
- //Place assignOp between the scan and the op above it
- assignOp.getInputs().addAll(opAboveBrokersScan.getInputs());
- opAboveBrokersScan.getInputs().clear();
- opAboveBrokersScan.getInputs().add(new MutableObject<ILogicalOperator>(assignOp));
- context.computeAndSetTypeEnvironmentForOperator(assignOp);
- context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan);
- context.computeAndSetTypeEnvironmentForOperator(eOp);
-
- //get subscriptionIdVar
- LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0);
-
- //The channelExecutionTime is created just before the scan
- LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue())
- .getVariables().get(0);
-
- ProjectOperator badProject = (ProjectOperator) findOp(op, "project");
- badProject.getVariables().add(subscriptionIdVar);
- badProject.getVariables().add(brokerEndpointVar);
- badProject.getVariables().add(channelExecutionVar);
- context.computeAndSetTypeEnvironmentForOperator(badProject);
-
- //Create my brokerNotify plan above the extension Operator
- ExtensionOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar,
- context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName);
-
- opRef.setValue(dOp);
-
- return true;
- }
-
- private ExtensionOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar,
- LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context,
- ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName)
- throws AlgebricksException {
- //create the Distinct Op
- ArrayList<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
- VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar);
- expressions.add(new MutableObject<ILogicalExpression>(vExpr));
- DistinctOperator distinctOp = new DistinctOperator(expressions);
-
- //create the GroupBy Op
- //And set the distinct as input
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
- List<ILogicalPlan> nestedPlans = new ArrayList<ILogicalPlan>();
-
- //create group by operator
- GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
- groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar));
- groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar));
- groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(distinctOp));
-
- //create nested plan for subscription ids in group by
- NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator(
- new MutableObject<ILogicalOperator>(groupbyOp));
- //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan
- //LogicalVariable subscriptionListVar = context.newSubplanOutputVar();
- LogicalVariable subscriptionListVar = context.newVar();
- List<LogicalVariable> aggVars = new ArrayList<LogicalVariable>();
- aggVars.add(subscriptionListVar);
- AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression(
- AsterixBuiltinFunctions.LISTIFY, new ArrayList<Mutable<ILogicalExpression>>());
- funAgg.getArguments()
- .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(subscriptionIdVar)));
- List<Mutable<ILogicalExpression>> aggExpressions = new ArrayList<Mutable<ILogicalExpression>>();
- aggExpressions.add(new MutableObject<ILogicalExpression>(funAgg));
- AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions);
- listifyOp.getInputs().add(new MutableObject<ILogicalOperator>(nestedTupleSourceOp));
-
- //add nested plans
- nestedPlans.add(new ALogicalPlanImpl(new MutableObject<ILogicalOperator>(listifyOp)));
-
- //Create the NotifyBrokerOperator
- NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar,
- channelExecutionVar);
- EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName);
- NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId);
- notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp);
- ExtensionOperator extensionOp = new ExtensionOperator(notifyBrokerOp);
- extensionOp.setPhysicalOperator(notifyBrokerPOp);
- extensionOp.getInputs().add(new MutableObject<ILogicalOperator>(groupbyOp));
-
- //Set the input for the brokerNotify as the replicate operator
- distinctOp.getInputs().add(new MutableObject<ILogicalOperator>(eOp));
-
- //compute environment bottom up
-
- context.computeAndSetTypeEnvironmentForOperator(distinctOp);
- context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
- context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp);
- context.computeAndSetTypeEnvironmentForOperator(listifyOp);
- context.computeAndSetTypeEnvironmentForOperator(extensionOp);
-
- return extensionOp;
-
- }
-
- @SuppressWarnings("unchecked")
- private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar,
- AbstractLogicalOperator opAboveBrokersScan) {
- Mutable<ILogicalExpression> fieldRef = new MutableObject<ILogicalExpression>(
- new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint))));
- DataSourceScanOperator brokerScan = null;
- for (Mutable<ILogicalOperator> subOp : opAboveBrokersScan.getInputs()) {
- if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
- brokerScan = (DataSourceScanOperator) subOp.getValue();
- }
- }
- Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
- new VariableReferenceExpression(brokerScan.getVariables().get(2)));
-
- ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression(
- FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef);
- ArrayList<LogicalVariable> varArray = new ArrayList<LogicalVariable>(1);
- varArray.add(brokerEndpointVar);
- ArrayList<Mutable<ILogicalExpression>> exprArray = new ArrayList<Mutable<ILogicalExpression>>(1);
- exprArray.add(new MutableObject<ILogicalExpression>(fieldAccessByName));
- return new AssignOperator(varArray, exprArray);
- }
-
- /*This function searches for the needed op
- * If lookingForBrokers, find the op above the brokers scan
- * Else find the suscbriptionsScan
- */
- private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) {
- if (!op.hasInputs()) {
- return null;
- }
- for (Mutable<ILogicalOperator> subOp : op.getInputs()) {
- if (lookingForString.equals("brokers")) {
- if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) {
- return op;
- } else {
- AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
- if (nestedOp != null) {
- return nestedOp;
- }
- }
-
- } else if (lookingForString.equals("project")) {
- if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) {
- return (AbstractLogicalOperator) subOp.getValue();
- } else {
- AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
- if (nestedOp != null) {
- return nestedOp;
- }
- }
- }
-
- else {
- if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) {
- return (AbstractLogicalOperator) subOp.getValue();
- } else {
- AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(),
- lookingForString);
- if (nestedOp != null) {
- return nestedOp;
- }
- }
-
- }
- }
- return null;
- }
-
- private boolean isBrokerScan(AbstractLogicalOperator op) {
- if (op instanceof DataSourceScanOperator) {
- if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
- DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
- if (dds.getDataset().getDataverseName().equals("Metadata")
- && dds.getDataset().getDatasetName().equals("Broker")) {
- return true;
- }
- }
- }
- return false;
- }
-
- private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) {
- if (op instanceof DataSourceScanOperator) {
- if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) {
- DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource();
- if (dds.getDataset().getItemTypeDataverseName().equals("Metadata")
- && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) {
- if (dds.getDataset().getDatasetName().equals(subscriptionsName)) {
- return true;
- }
- }
- }
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
deleted file mode 100644
index c680988..0000000
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.runtime;
-
-import java.util.Collection;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-
-/**
- * A repetitive channel operator, which uses a Java timer to run a given query periodically
- */
-public class NotifyBrokerOperator extends AbstractExtensibleLogicalOperator {
- private final LogicalVariable subscriptionIdVar;
- private final LogicalVariable brokerEndpointVar;
- private final LogicalVariable channelExecutionVar;
-
- public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar,
- LogicalVariable resultSetVar) {
- this.brokerEndpointVar = brokerEndpointVar;
- this.subscriptionIdVar = subscriptionIdVar;
- this.channelExecutionVar = resultSetVar;
- }
-
- public LogicalVariable getSubscriptionVariable() {
- return subscriptionIdVar;
- }
-
- public LogicalVariable getBrokerEndpointVariable() {
- return brokerEndpointVar;
- }
-
- public LogicalVariable getChannelExecutionVariable() {
- return channelExecutionVar;
- }
-
- @Override
- public String toString() {
- return "notify-brokers";
- }
-
- @Override
- public boolean isMap() {
- return false;
- }
-
- @Override
- public IOperatorExtension newInstance() {
- return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar);
- }
-
- @Override
- public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
- throws AlgebricksException {
- return false;
- }
-
- @Override
- public void getUsedVariables(Collection<LogicalVariable> usedVars) {
- usedVars.add(subscriptionIdVar);
- usedVars.add(brokerEndpointVar);
- usedVars.add(channelExecutionVar);
- }
-
- @Override
- public void getProducedVariables(Collection<LogicalVariable> producedVars) {
- // none produced
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
deleted file mode 100644
index 753ece7..0000000
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.bad.runtime;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class NotifyBrokerPOperator extends AbstractPhysicalOperator {
-
- private final EntityId entityId;
-
- public NotifyBrokerPOperator(EntityId entityId) {
- this.entityId = entityId;
- }
-
- @Override
- public PhysicalOperatorTag getOperatorTag() {
- return PhysicalOperatorTag.EXTENSION_OPERATOR;
- }
-
- @Override
- public String toString() {
- return "NOTIFY_BROKERS";
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- return emptyUnaryRequirements();
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
- public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- ExtensionOperator notify = (ExtensionOperator) op;
- LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable();
- LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable();
- LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable();
-
- int brokerColumn = inputSchemas[0].findVariable(brokerVar);
- int subColumn = inputSchemas[0].findVariable(subVar);
- int executionColumn = inputSchemas[0].findVariable(executionVar);
-
- IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn);
- IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn);
- IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn);
-
- NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory,
- channelExecutionEvalFactory, entityId);
-
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
- context);
-
- builder.contributeMicroOperator(op, runtime, recDesc);
-
- // and contribute one edge from its child
- ILogicalOperator src = op.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, notify, 0);
- }
-
- @Override
- public boolean isMicroOperator() {
- return true;
- }
-
- @Override
- public boolean expensiveThanMaterialization() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
deleted file mode 100644
index d55080c..0000000
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.bad.runtime;
-
-import java.io.DataInputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.ChannelJobService;
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
-import org.apache.asterix.om.base.ADateTime;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.types.AOrderedListType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-
-public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
-
- private final ByteBufferInputStream bbis = new ByteBufferInputStream();
- private final DataInputStream di = new DataInputStream(bbis);
- private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer(
- new AOrderedListType(BuiltinType.AUUID, null));
-
- private IPointable inputArg0 = new VoidPointable();
- private IPointable inputArg1 = new VoidPointable();
- private IPointable inputArg2 = new VoidPointable();
- private IScalarEvaluator eval0;
- private IScalarEvaluator eval1;
- private IScalarEvaluator eval2;
- private final ActiveManager activeManager;
- private final EntityId entityId;
- private ChannelJobService channelJobService;
-
- public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory,
- IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory,
- EntityId activeJobId) throws AlgebricksException {
- this.tRef = new FrameTupleReference();
- eval0 = brokerEvalFactory.createScalarEvaluator(ctx);
- eval1 = subEvalFactory.createScalarEvaluator(ctx);
- eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx);
- this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
- .getApplicationObject()).getActiveManager();
- this.entityId = activeJobId;
- channelJobService = new ChannelJobService();
- }
-
- @Override
- public void open() throws HyracksDataException {
- return;
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- try {
- eval0.evaluate(tRef, inputArg0);
- eval1.evaluate(tRef, inputArg1);
- eval2.evaluate(tRef, inputArg2);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- int serBrokerOffset = inputArg0.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1);
- AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di);
-
- int serSubOffset = inputArg1.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1);
- AOrderedList subs = subSerDes.deserialize(di);
-
- int resultSetOffset = inputArg2.getStartOffset();
- bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1);
- ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di);
- String executionTimeString = executionTime.toSimpleString();
-
- channelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs,
- executionTimeString);
-
- }
-
- }
-
- @Override
- public void close() throws HyracksDataException {
- return;
- }
-
- @Override
- public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
- this.inputRecordDesc = recordDescriptor;
- this.tAccess = new FrameTupleAccessor(inputRecordDesc);
- }
-
- @Override
- public void flush() throws HyracksDataException {
- return;
- }
-
- @Override
- public void fail() throws HyracksDataException {
- failed = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
deleted file mode 100644
index d5452d4..0000000
--- a/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.bad.runtime;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final IScalarEvaluatorFactory brokerEvalFactory;
- private final IScalarEvaluatorFactory subEvalFactory;
- private final IScalarEvaluatorFactory channelExecutionEvalFactory;
- private final EntityId entityId;
-
- public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory,
- IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) {
- this.brokerEvalFactory = brokerEvalFactory;
- this.subEvalFactory = subEvalFactory;
- this.channelExecutionEvalFactory = channelExecutionEvalFactory;
- this.entityId = entityId;
- }
-
- @Override
- public String toString() {
- return "notify-broker";
- }
-
- @Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
- return new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, channelExecutionEvalFactory, entityId);
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
deleted file mode 100644
index f3b0a90..0000000
--- a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.asterix.bad.runtime;
-
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * A repetitive channel operator, which uses a Java timer to run a given query periodically
- */
-public class RepetitiveChannelOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName());
-
- /** The unique identifier of the job. **/
- protected final EntityId entityId;
-
- protected final JobSpecification jobSpec;
-
- private final String duration;
-
- private String strIP;
- private int port;
-
- public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String dataverseName, String channelName,
- String duration, JobSpecification channeljobSpec, String strIP, int port) {
- super(spec, 0, 0);
- this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName);
- this.jobSpec = channeljobSpec;
- this.duration = duration;
- this.strIP = strIP;
- this.port = port;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
- RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition);
- try {
- return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
- }
-
- public String getDuration() {
- return duration;
- }
-
- public EntityId getEntityId() {
- return entityId;
- }
-
- public JobSpecification getJobSpec() {
- return jobSpec;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
deleted file mode 100644
index 873d2e7..0000000
--- a/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.runtime;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
-import org.apache.asterix.bad.ChannelJobService;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable {
-
- private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
-
- private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
- private final JobSpecification jobSpec;
- private long duration;
- private ChannelJobService channelJobService;
- private String strIP;
- private int port;
-
- public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId,
- JobSpecification channeljobSpec, String duration, String strIP, int port) throws AsterixException {
- super(ctx, runtimeId);
- this.jobSpec = channeljobSpec;
- this.duration = findPeriod(duration);
- //TODO: we should share channelJobService as a single instance
- //And only create one hcc
- channelJobService = new ChannelJobService();
- this.strIP = strIP;
- this.port = port;
- }
-
- public void executeJob() throws Exception {
- LOGGER.info("Executing Job: " + runtimeId.toString());
- channelJobService.runChannelJob(jobSpec, strIP, port);
- }
-
- @Override
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
- throws HyracksDataException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void start() throws HyracksDataException, InterruptedException {
- scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- executeJob();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, duration, duration, TimeUnit.MILLISECONDS);
-
- while (!scheduledExecutorService.isTerminated()) {
-
- }
-
- }
-
- @Override
- protected void abort() throws HyracksDataException, InterruptedException {
- scheduledExecutorService.shutdown();
- }
-
- private long findPeriod(String duration) {
- //TODO: Allow Repetitive Channels to use YMD durations
- String hoursMinutesSeconds = "";
- if (duration.indexOf('T') != -1) {
- hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 1);
- }
- double seconds = 0;
- if (hoursMinutesSeconds != "") {
- int pos = 0;
- if (hoursMinutesSeconds.indexOf('H') != -1) {
- Double hours = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('H')));
- seconds += (hours * 60 * 60);
- pos = hoursMinutesSeconds.indexOf('H') + 1;
-
- }
- if (hoursMinutesSeconds.indexOf('M') != -1) {
- Double minutes = Double
- .parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('M')));
- seconds += (minutes * 60);
- pos = hoursMinutesSeconds.indexOf('M') + 1;
- }
- if (hoursMinutesSeconds.indexOf('S') != -1) {
- Double s = Double.parseDouble(hoursMinutesSeconds.substring(pos, hoursMinutesSeconds.indexOf('S')));
- seconds += (s);
- }
-
- }
- return (long) (seconds * 1000);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/main/resources/lang-extension/lang.txt
----------------------------------------------------------------------
diff --git a/src/main/resources/lang-extension/lang.txt b/src/main/resources/lang-extension/lang.txt
deleted file mode 100644
index 233ec97..0000000
--- a/src/main/resources/lang-extension/lang.txt
+++ /dev/null
@@ -1,178 +0,0 @@
-import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
-import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
-import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement;
-import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement;
-import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
-import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
-
-
-@merge
-Statement SingleStatement() throws ParseException:
-{
- // merge area 1
- before:
- after:
-}
-{
- (
- // merge area 2
- before:
- after: | stmt = ChannelSubscriptionStatement())
- {
- // merge area 3
- }
-}
-
-@merge
-Statement CreateStatement() throws ParseException:
-{
- // merge area 1
- before:
- after:
-}
-{
- (
- // merge area 2
- before:
- after: | stmt = ChannelSpecification() | stmt = BrokerSpecification())
- {
- // merge area 3
- }
-}
-
-@merge
-Statement DropStatement() throws ParseException:
-{
- // merge area 1
- before:
- after:
-}
-{
- (
- // merge area 2
- before:
- after: | "channel" pairId = QualifiedName() ifExists = IfExists()
- {
- stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists);
- }
- | <BROKER> pairId = QualifiedName() ifExists = IfExists()
- {
- stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
- }
- )
- {
- // merge area 3
- }
-}
-
-@new
-CreateChannelStatement ChannelSpecification() throws ParseException:
-{
- Pair<Identifier,Identifier> nameComponents = null;
- FunctionSignature appliedFunction = null;
- CreateChannelStatement ccs = null;
- String fqFunctionName = null;
- Expression period = null;
-}
-{
- (
- "repetitive" "channel" nameComponents = QualifiedName()
- <USING> appliedFunction = FunctionSignature()
- "period" period = FunctionCallExpr()
- {
- ccs = new CreateChannelStatement(nameComponents.first,
- nameComponents.second, appliedFunction, period);
- }
- )
- {
- return ccs;
- }
-}
-
-@new
-CreateBrokerStatement BrokerSpecification() throws ParseException:
-{
- CreateBrokerStatement cbs = null;
- Pair<Identifier,Identifier> name = null;
- String endPoint = null;
-}
-{
- (
- <BROKER> name = QualifiedName()
- <AT> endPoint = StringLiteral()
- {
- cbs = new CreateBrokerStatement(name.first, name.second,endPoint);
- }
- )
- {
- return cbs;
- }
-}
-
-@new
-Statement ChannelSubscriptionStatement() throws ParseException:
-{
- Statement stmt = null;
- Pair<Identifier,Identifier> nameComponents = null;
- List<Expression> argList = new ArrayList<Expression>();
- Expression tmp = null;
- String id = null;
- String subscriptionId = null;
- Pair<Identifier,Identifier> brokerName = null;
-}
-{
- (
- "subscribe" <TO> nameComponents = QualifiedName()
- <LEFTPAREN> (tmp = Expression()
- {
- argList.add(tmp);
- }
- (<COMMA> tmp = Expression()
- {
- argList.add(tmp);
- }
- )*)? <RIGHTPAREN> <ON> brokerName = QualifiedName()
- {
- stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
- }
- | "unsubscribe" id = StringLiteral() <FROM> nameComponents = QualifiedName()
- {
- setDataverses(new ArrayList<String>());
- setDatasets(new ArrayList<String>());
- VariableExpr varExp = new VariableExpr();
- VarIdentifier var = new VarIdentifier();
- varExp.setVar(var);
- var.setValue("$subscriptionPlaceholder");
- getCurrentScope().addNewVarSymbolToScope(varExp.getVar());
- List<String> dataverses = getDataverses();
- List<String> datasets = getDatasets();
- // we remove the pointer to the dataverses and datasets
- setDataverses(null);
- setDatasets(null);
- stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets);
- }
- | "change" "subscription" subscriptionId = StringLiteral() <ON> nameComponents = QualifiedName()
- <LEFTPAREN> (tmp = Expression()
- {
- argList.add(tmp);
- }
- (<COMMA> tmp = Expression()
- {
- argList.add(tmp);
- }
- )*)? <RIGHTPAREN>
- <TO> brokerName = QualifiedName()
- {
- stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId);
- }
- )
- {
- return stmt;
- }
-}
-
-<DEFAULT,IN_DBL_BRACE>
-TOKEN [IGNORE_CASE]:
-{
- <BROKER : "broker">
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
deleted file mode 100644
index 77e8afe..0000000
--- a/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.test;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.AsterixTransactionProperties;
-import org.apache.asterix.test.aql.TestExecutor;
-import org.apache.asterix.test.runtime.ExecutionTestUtil;
-import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.asterix.testframework.xml.TestGroup;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-/**
- * Runs the runtime test cases under 'src/test/resources/runtimets'.
- */
-@RunWith(Parameterized.class)
-public class BADExecutionTest {
-
- protected static final Logger LOGGER = Logger.getLogger(BADExecutionTest.class.getName());
-
- protected static final String PATH_ACTUAL = "target/rttest" + File.separator;
- protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" },
- File.separator);
-
- protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
-
- protected static AsterixTransactionProperties txnProperties;
- private static final TestExecutor testExecutor = new TestExecutor();
- private static final boolean cleanupOnStart = true;
- private static final boolean cleanupOnStop = true;
-
- protected static TestGroup FailedGroup;
-
- @BeforeClass
- public static void setUp() throws Exception {
- File outdir = new File(PATH_ACTUAL);
- outdir.mkdirs();
- ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME);
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- ExecutionTestUtil.tearDown(cleanupOnStop);
- ExecutionTestUtil.integrationUtil.removeTestStorageFiles();
- }
-
- @Parameters(name = "BADExecutionTest {index}: {0}")
- public static Collection<Object[]> tests() throws Exception {
- return buildTestsInXml("testsuite.xml");
- }
-
- protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
- Collection<Object[]> testArgs = new ArrayList<Object[]>();
- TestCaseContext.Builder b = new TestCaseContext.Builder();
- for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
- testArgs.add(new Object[] { ctx });
- }
- return testArgs;
-
- }
-
- protected TestCaseContext tcCtx;
-
- public BADExecutionTest(TestCaseContext tcCtx) {
- this.tcCtx = tcCtx;
- }
-
- @Test
- public void test() throws Exception {
- testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup);
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
deleted file mode 100644
index 4949b34..0000000
--- a/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.test;
-
-import java.io.File;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.test.optimizer.OptimizerTest;
-import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class BADOptimizerTest extends OptimizerTest {
-
- private static final Logger LOGGER = Logger.getLogger(BADOptimizerTest.class.getName());
-
- @BeforeClass
- public static void setUp() throws Exception {
- TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml";
- System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
- final File outdir = new File(PATH_ACTUAL);
- outdir.mkdirs();
-
- integrationUtil.init(true);
- // Set the node resolver to be the identity resolver that expects node names
- // to be node controller ids; a valid assumption in test environment.
- System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY,
- IdentitiyResolverFactory.class.getName());
- }
-
- public BADOptimizerTest(File queryFile, File expectedFile, File actualFile) {
- super(queryFile, expectedFile, actualFile);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/test/resources/conf/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/src/test/resources/conf/asterix-build-configuration.xml b/src/test/resources/conf/asterix-build-configuration.xml
deleted file mode 100644
index c2f5d41..0000000
--- a/src/test/resources/conf/asterix-build-configuration.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<asterixConfiguration xmlns="asterixconf">
- <metadataNode>asterix_nc1</metadataNode>
- <store>
- <ncId>asterix_nc1</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <store>
- <ncId>asterix_nc2</ncId>
- <storeDirs>iodevice0,iodevice1</storeDirs>
- </store>
- <transactionLogDir>
- <ncId>asterix_nc1</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath>
- </transactionLogDir>
- <transactionLogDir>
- <ncId>asterix_nc2</ncId>
- <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath>
- </transactionLogDir>
- <extensions>
- <extension>
- <extensionClassName>org.apache.asterix.bad.lang.BADQueryTranslatorExtension</extensionClassName>
- </extension>
- <extension>
- <extensionClassName>org.apache.asterix.bad.lang.BADLangExtension</extensionClassName>
- </extension>
- <extension>
- <extensionClassName>org.apache.asterix.bad.metadata.BADMetadataExtension</extensionClassName>
- </extension>
- </extensions>
- <property>
- <name>max.wait.active.cluster</name>
- <value>60</value>
- <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all
- nodes are available)
- before a submitted query/statement can be
- executed. (Default = 60 seconds)
- </description>
- </property>
- <property>
- <name>log.level</name>
- <value>WARNING</value>
- <description>Log level for running tests/build</description>
- </property>
- <property>
- <name>compiler.framesize</name>
- <value>32768</value>
- </property>
- <property>
- <name>compiler.sortmemory</name>
- <value>327680</value>
- </property>
- <property>
- <name>compiler.groupmemory</name>
- <value>163840</value>
- </property>
- <property>
- <name>compiler.joinmemory</name>
- <value>163840</value>
- </property>
- <property>
- <name>compiler.pregelix.home</name>
- <value>~/pregelix</value>
- </property>
- <property>
- <name>storage.buffercache.pagesize</name>
- <value>32768</value>
- <description>The page size in bytes for pages in the buffer cache.
- (Default = "32768" // 32KB)
- </description>
- </property>
- <property>
- <name>storage.buffercache.size</name>
- <value>33554432</value>
- <description>The size of memory allocated to the disk buffer cache.
- The value should be a multiple of the buffer cache page size(Default
- = "33554432" // 32MB)
- </description>
- </property>
- <property>
- <name>storage.memorycomponent.numpages</name>
- <value>8</value>
- <description>The number of pages to allocate for a memory component.
- (Default = 8)
- </description>
- </property>
- <property>
- <name>plot.activate</name>
- <value>false</value>
- <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
- </description>
- </property>
-</asterixConfiguration>
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/test/resources/conf/cluster.xml
----------------------------------------------------------------------
diff --git a/src/test/resources/conf/cluster.xml b/src/test/resources/conf/cluster.xml
deleted file mode 100644
index 8f0b694..0000000
--- a/src/test/resources/conf/cluster.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<cluster xmlns="cluster">
- <instance_name>asterix</instance_name>
- <store>storage</store>
-
- <data_replication>
- <enabled>false</enabled>
- <replication_port>2016</replication_port>
- <replication_factor>2</replication_factor>
- <auto_failover>false</auto_failover>
- <replication_time_out>30</replication_time_out>
- </data_replication>
-
- <master_node>
- <id>master</id>
- <client_ip>127.0.0.1</client_ip>
- <cluster_ip>127.0.0.1</cluster_ip>
- <client_port>1098</client_port>
- <cluster_port>1099</cluster_port>
- <http_port>8888</http_port>
- </master_node>
- <node>
- <id>nc1</id>
- <cluster_ip>127.0.0.1</cluster_ip>
- <replication_port>2016</replication_port>
- </node>
- <node>
- <id>nc2</id>
- <cluster_ip>127.0.0.1</cluster_ip>
- <replication_port>2017</replication_port>
- </node>
-</cluster>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/src/test/resources/conf/hyracks-deployment.properties
----------------------------------------------------------------------
diff --git a/src/test/resources/conf/hyracks-deployment.properties b/src/test/resources/conf/hyracks-deployment.properties
deleted file mode 100644
index 17a6772..0000000
--- a/src/test/resources/conf/hyracks-deployment.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-#/*
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-cc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.CCBootstrapImpl
-nc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.NCBootstrapImpl
-cc.ip=127.0.0.1
-cc.port=1098