You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/01/14 21:31:56 UTC
[02/26] incubator-asterixdb git commit: Feed Fixes and Cleanup
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
new file mode 100644
index 0000000..5b5f19f
--- /dev/null
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.feeds;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
+import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.DatasourceAdapter;
+import org.apache.asterix.metadata.entities.Datatype;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import org.apache.hyracks.api.constraints.Constraint;
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
+import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+
+/**
+ * A utility class for providing helper functions for feeds
+ * TODO: Refactor this class.
+ */
+public class FeedMetadataUtil {
+
+ private static Logger LOGGER = Logger.getLogger(FeedMetadataUtil.class.getName());
+
+ private static class LocationConstraint {
+ int partition;
+ String location;
+ }
+
+ public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx)
+ throws AsterixException {
+ Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Unknown target dataset :" + datasetName);
+ }
+
+ if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
+ throw new AsterixException("Statement not applicable. Dataset " + datasetName + " is not of required type "
+ + DatasetType.INTERNAL);
+ }
+ return dataset;
+ }
+
+ public static Feed validateIfFeedExists(String dataverse, String feedName, MetadataTransactionContext ctx)
+ throws MetadataException, AsterixException {
+ Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverse, feedName);
+ if (feed == null) {
+ throw new AsterixException("Unknown source feed: " + feedName);
+ }
+ return feed;
+ }
+
+ public static FeedPolicyEntity validateIfPolicyExists(String dataverse, String policyName,
+ MetadataTransactionContext ctx) throws AsterixException {
+ FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
+ if (feedPolicy == null) {
+ feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ policyName);
+ if (feedPolicy == null) {
+ throw new AsterixException("Unknown feed policy" + policyName);
+ }
+ }
+ return feedPolicy;
+ }
+
+ public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
+ FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties) {
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Original Job Spec:" + spec);
+ }
+
+ JobSpecification altered = new JobSpecification(spec.getFrameSize());
+ Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
+ boolean preProcessingRequired = preProcessingRequired(feedConnectionId);
+ // copy operators
+ String operandId = null;
+ Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
+ FeedMetaOperatorDescriptor metaOp = null;
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
+ operandId = FeedRuntimeId.DEFAULT_OPERAND_ID;
+ IOperatorDescriptor opDesc = entry.getValue();
+ if (opDesc instanceof FeedCollectOperatorDescriptor) {
+ FeedCollectOperatorDescriptor orig = (FeedCollectOperatorDescriptor) opDesc;
+ FeedCollectOperatorDescriptor fiop = new FeedCollectOperatorDescriptor(altered,
+ orig.getFeedConnectionId(), orig.getSourceFeedId(), (ARecordType) orig.getOutputType(),
+ orig.getRecordDescriptor(), orig.getFeedPolicyProperties(), orig.getSubscriptionLocation());
+ oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
+ } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
+ operandId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+ metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+ FeedRuntimeType.STORE, false, operandId);
+ oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+ } else if (opDesc instanceof AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) {
+ operandId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+ metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+ FeedRuntimeType.STORE, false, operandId);
+ oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+
+ } else {
+ FeedRuntimeType runtimeType = null;
+ boolean enableSubscriptionMode = false;
+ boolean createMetaOp = true;
+ OperatorDescriptorId opId = null;
+ if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
+ IPushRuntimeFactory runtimeFactory = ((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
+ .getRuntimeFactories()[0];
+ if (runtimeFactory instanceof AssignRuntimeFactory) {
+ IConnectorDescriptor connectorDesc = spec.getOperatorInputMap().get(opDesc.getOperatorId())
+ .get(0);
+ IOperatorDescriptor sourceOp = spec.getProducer(connectorDesc);
+ if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+ runtimeType = preProcessingRequired ? FeedRuntimeType.COMPUTE : FeedRuntimeType.OTHER;
+ enableSubscriptionMode = preProcessingRequired;
+ } else {
+ runtimeType = FeedRuntimeType.OTHER;
+ }
+ } else if (runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) {
+ runtimeType = FeedRuntimeType.ETS;
+ } else {
+ runtimeType = FeedRuntimeType.OTHER;
+ }
+ } else {
+ if (opDesc instanceof AbstractSingleActivityOperatorDescriptor) {
+ runtimeType = FeedRuntimeType.OTHER;
+ } else {
+ opId = altered.createOperatorDescriptorId(opDesc);
+ createMetaOp = false;
+ }
+ }
+ if (createMetaOp) {
+ metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+ runtimeType, enableSubscriptionMode, operandId);
+ opId = metaOp.getOperatorId();
+ }
+ oldNewOID.put(opDesc.getOperatorId(), opId);
+ }
+ }
+
+ // copy connectors
+ Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
+ for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
+ IConnectorDescriptor connDesc = entry.getValue();
+ ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
+ connectorMapping.put(entry.getKey(), newConnId);
+ }
+
+ // make connections between operators
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
+ .getConnectorOperatorMap().entrySet()) {
+ IConnectorDescriptor connDesc = altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
+ Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
+ Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
+
+ IOperatorDescriptor leftOpDesc = altered.getOperatorMap()
+ .get(oldNewOID.get(leftOp.getLeft().getOperatorId()));
+ IOperatorDescriptor rightOpDesc = altered.getOperatorMap()
+ .get(oldNewOID.get(rightOp.getLeft().getOperatorId()));
+
+ altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
+ }
+
+ // prepare for setting partition constraints
+ Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<OperatorDescriptorId, List<LocationConstraint>>();
+ Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>();
+
+ for (Constraint constraint : spec.getUserConstraints()) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ OperatorDescriptorId opId;
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT:
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ operatorCounts.put(opId, (int) ((ConstantExpression) cexpr).getValue());
+ break;
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+
+ IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId));
+ List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
+ if (locations == null) {
+ locations = new ArrayList<>();
+ operatorLocations.put(opDesc.getOperatorId(), locations);
+ }
+ String location = (String) ((ConstantExpression) cexpr).getValue();
+ LocationConstraint lc = new LocationConstraint();
+ lc.location = location;
+ lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
+ locations.add(lc);
+ break;
+ default:
+ break;
+ }
+ }
+
+ // set absolute location constraints
+ for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) {
+ IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
+ Collections.sort(entry.getValue(), new Comparator<LocationConstraint>() {
+
+ @Override
+ public int compare(LocationConstraint o1, LocationConstraint o2) {
+ return o1.partition - o2.partition;
+ }
+ });
+ String[] locations = new String[entry.getValue().size()];
+ for (int i = 0; i < locations.length; ++i) {
+ locations[i] = entry.getValue().get(i).location;
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc, locations);
+ }
+
+ // set count constraints
+ for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) {
+ IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
+ if (!operatorLocations.keySet().contains(entry.getKey())) {
+ PartitionConstraintHelper.addPartitionCountConstraint(altered, opDesc, entry.getValue());
+ }
+ }
+
+ // useConnectorSchedulingPolicy
+ altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
+
+ // connectorAssignmentPolicy
+ altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
+
+ // roots
+ for (OperatorDescriptorId root : spec.getRoots()) {
+ altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root)));
+ }
+
+ // jobEventListenerFactory
+ altered.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("New Job Spec:" + altered);
+ }
+
+ return altered;
+
+ }
+
+ public static void increaseCardinality(JobSpecification spec, FeedRuntimeType compute, int requiredCardinality,
+ List<String> newLocations) throws AsterixException {
+ IOperatorDescriptor changingOpDesc = alterJobSpecForComputeCardinality(spec, requiredCardinality);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, changingOpDesc,
+ nChooseK(requiredCardinality, newLocations));
+
+ }
+
+ public static void decreaseComputeCardinality(JobSpecification spec, FeedRuntimeType compute,
+ int requiredCardinality, List<String> currentLocations) throws AsterixException {
+ IOperatorDescriptor changingOpDesc = alterJobSpecForComputeCardinality(spec, requiredCardinality);
+ String[] chosenLocations = nChooseK(requiredCardinality, currentLocations);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, changingOpDesc, chosenLocations);
+ }
+
+ private static IOperatorDescriptor alterJobSpecForComputeCardinality(JobSpecification spec, int requiredCardinality)
+ throws AsterixException {
+ Map<ConnectorDescriptorId, IConnectorDescriptor> connectors = spec.getConnectorMap();
+ Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap = spec
+ .getConnectorOperatorMap();
+
+ IOperatorDescriptor sourceOp = null;
+ IOperatorDescriptor targetOp = null;
+ IConnectorDescriptor connDesc = null;
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : connectorOpMap
+ .entrySet()) {
+ ConnectorDescriptorId cid = entry.getKey();
+ sourceOp = entry.getValue().getKey().getKey();
+ if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+ targetOp = entry.getValue().getValue().getKey();
+ if (targetOp instanceof FeedMetaOperatorDescriptor
+ && (((FeedMetaOperatorDescriptor) targetOp).getRuntimeType().equals(FeedRuntimeType.COMPUTE))) {
+ connDesc = connectors.get(cid);
+ break;
+ } else {
+ throw new AsterixException("Incorrect manipulation, feed does not have a compute stage");
+ }
+ }
+ }
+
+ Map<OperatorDescriptorId, List<IConnectorDescriptor>> operatorInputMap = spec.getOperatorInputMap();
+ boolean removed = operatorInputMap.get(targetOp.getOperatorId()).remove(connDesc);
+ if (!removed) {
+ throw new AsterixException("Connector desc not found");
+ }
+ Map<OperatorDescriptorId, List<IConnectorDescriptor>> operatorOutputMap = spec.getOperatorOutputMap();
+ removed = operatorOutputMap.get(sourceOp.getOperatorId()).remove(connDesc);
+ if (!removed) {
+ throw new AsterixException("Connector desc not found");
+ }
+ spec.getConnectorMap().remove(connDesc.getConnectorId());
+ connectorOpMap.remove(connDesc.getConnectorId());
+
+ ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(requiredCardinality);
+ MToNPartitioningConnectorDescriptor newConnector = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+ spec.getConnectorMap().put(newConnector.getConnectorId(), newConnector);
+ spec.connect(newConnector, sourceOp, 0, targetOp, 0);
+
+ // ==============================================================================
+ Set<Constraint> userConstraints = spec.getUserConstraints();
+ Constraint countConstraint = null;
+ Constraint locationConstraint = null;
+ List<LocationConstraint> locations = new ArrayList<LocationConstraint>();
+ IOperatorDescriptor changingOpDesc = null;
+
+ for (Constraint constraint : userConstraints) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ OperatorDescriptorId opId;
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT: {
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ IOperatorDescriptor opDesc = spec.getOperatorMap().get(opId);
+ if (opDesc instanceof FeedMetaOperatorDescriptor) {
+ FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor) opDesc).getRuntimeType();
+ if (runtimeType.equals(FeedRuntimeType.COMPUTE)) {
+ countConstraint = constraint;
+ changingOpDesc = opDesc;
+ }
+ }
+ break;
+ }
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+ IOperatorDescriptor opDesc = spec.getOperatorMap().get(opId);
+ if (opDesc instanceof FeedMetaOperatorDescriptor) {
+ FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor) opDesc).getRuntimeType();
+ if (runtimeType.equals(FeedRuntimeType.COMPUTE)) {
+ locationConstraint = constraint;
+ changingOpDesc = opDesc;
+ String location = (String) ((ConstantExpression) cexpr).getValue();
+ LocationConstraint lc = new LocationConstraint();
+ lc.location = location;
+ lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
+ locations.add(lc);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ userConstraints.remove(countConstraint);
+ if (locationConstraint != null) {
+ userConstraints.remove(locationConstraint);
+ }
+
+ return changingOpDesc;
+ }
+
+ private static String[] nChooseK(int k, List<String> locations) {
+ String[] result = new String[k];
+ for (int i = 0; i < k; i++) {
+ result[i] = locations.get(i);
+ }
+ return result;
+ }
+
+ private static boolean preProcessingRequired(FeedConnectionId connectionId) {
+ MetadataTransactionContext ctx = null;
+ Feed feed = null;
+ boolean preProcessingRequired = false;
+ try {
+ MetadataManager.INSTANCE.acquireReadLatch();
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(),
+ connectionId.getFeedId().getFeedName());
+ preProcessingRequired = feed.getAppliedFunction() != null;
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (ctx != null) {
+ try {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ } catch (Exception abortException) {
+ e.addSuppressed(abortException);
+ throw new IllegalStateException(e);
+ }
+ }
+ } finally {
+ MetadataManager.INSTANCE.releaseReadLatch();
+ }
+ return preProcessingRequired;
+ }
+
+ public static Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> getPrimaryFeedFactoryAndOutput(
+ Feed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx)
+ throws AlgebricksException {
+ // This method needs to be re-visited
+ String adapterName = null;
+ DatasourceAdapter adapterEntity = null;
+ String adapterFactoryClassname = null;
+ IAdapterFactory adapterFactory = null;
+ ARecordType adapterOutputType = null;
+ Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> feedProps = null;
+ IDataSourceAdapter.AdapterType adapterType = null;
+ try {
+ adapterName = feed.getAdapterName();
+ Map<String, String> configuration = feed.getAdapterConfiguration();
+ configuration.putAll(policyAccessor.getFeedPolicy());
+ adapterOutputType = getOutputType(feed, configuration);
+ ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
+ // Get adapter from metadata dataset <Metadata dataverse>
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ adapterName);
+ // Get adapter from metadata dataset <The feed dataverse>
+ if (adapterEntity == null) {
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
+ }
+
+ if (adapterEntity != null) {
+ adapterType = adapterEntity.getType();
+ adapterFactoryClassname = adapterEntity.getClassname();
+ switch (adapterType) {
+ case INTERNAL:
+ adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ break;
+ case EXTERNAL:
+ String[] anameComponents = adapterName.split("#");
+ String libraryName = anameComponents[0];
+ ClassLoader cl = ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(),
+ libraryName);
+ adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+ break;
+ }
+ adapterFactory.configure(configuration, adapterOutputType);
+ } else {
+ adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration,
+ adapterOutputType);
+ adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
+ }
+ feedProps = new Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType>(adapterFactory,
+ adapterOutputType, adapterType);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to create adapter " + e);
+ }
+ return feedProps;
+ }
+
+ private static ARecordType getOutputType(Feed feed, Map<String, String> configuration) throws Exception {
+ ARecordType outputType = null;
+ String fqOutputType = configuration.get(ExternalDataConstants.KEY_TYPE_NAME);
+
+ if (fqOutputType == null) {
+ throw new IllegalArgumentException("No output type specified");
+ }
+ String[] dataverseAndType = fqOutputType.split("[.]");
+ String dataverseName;
+ String datatypeName;
+
+ if (dataverseAndType.length == 1) {
+ datatypeName = dataverseAndType[0];
+ dataverseName = feed.getDataverseName();
+ } else if (dataverseAndType.length == 2) {
+ dataverseName = dataverseAndType[0];
+ datatypeName = dataverseAndType[1];
+ } else
+ throw new IllegalArgumentException(
+ "Invalid value for the parameter " + ExternalDataConstants.KEY_TYPE_NAME);
+
+ MetadataTransactionContext ctx = null;
+ MetadataManager.INSTANCE.acquireReadLatch();
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverseName, datatypeName);
+ IAType type = t.getDatatype();
+ if (type.getTypeTag() != ATypeTag.RECORD) {
+ throw new IllegalStateException();
+ }
+ outputType = (ARecordType) t.getDatatype();
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (ctx != null) {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ }
+ throw e;
+ } finally {
+ MetadataManager.INSTANCE.releaseReadLatch();
+ }
+ return outputType;
+ }
+
+ public static String getSecondaryFeedOutput(Feed feed, FeedPolicyAccessor policyAccessor,
+ MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
+ String outputType = null;
+ String primaryFeedName = feed.getSourceFeedName();
+ Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);
+ FunctionSignature appliedFunction = primaryFeed.getAppliedFunction();
+ if (appliedFunction == null) {
+ Triple<IAdapterFactory, ARecordType, IDataSourceAdapter.AdapterType> result = getPrimaryFeedFactoryAndOutput(
+ primaryFeed, policyAccessor, mdTxnCtx);
+ outputType = result.second.getTypeName();
+ } else {
+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+ if (function != null) {
+ if (function.getLanguage().equals(Function.LANGUAGE_AQL)) {
+ throw new NotImplementedException(
+ "Secondary feeds derived from a source feed that has an applied AQL function are not supported yet.");
+ } else {
+ outputType = function.getReturnType();
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Function " + appliedFunction + " associated with source feed not found in Metadata.");
+ }
+ }
+ return outputType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedSubscriptionManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedSubscriptionManager.java
deleted file mode 100644
index b928e55..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedSubscriptionManager.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
-import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
-
-public class FeedSubscriptionManager implements IFeedSubscriptionManager {
-
- private static Logger LOGGER = Logger.getLogger(FeedSubscriptionManager.class.getName());
-
- private final String nodeId;
-
- private final Map<SubscribableFeedRuntimeId, ISubscribableRuntime> subscribableRuntimes;
-
- public FeedSubscriptionManager(String nodeId) {
- this.nodeId = nodeId;
- this.subscribableRuntimes = new HashMap<SubscribableFeedRuntimeId, ISubscribableRuntime>();
- }
-
- @Override
- public void registerFeedSubscribableRuntime(ISubscribableRuntime subscribableRuntime) {
- SubscribableFeedRuntimeId sid = (SubscribableFeedRuntimeId) subscribableRuntime.getRuntimeId();
- if (!subscribableRuntimes.containsKey(sid)) {
- subscribableRuntimes.put(sid, subscribableRuntime);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Registered feed subscribable runtime " + subscribableRuntime);
- }
- } else {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Feed ingestion runtime " + subscribableRuntime + " already registered.");
- }
- }
- }
-
- @Override
- public ISubscribableRuntime getSubscribableRuntime(SubscribableFeedRuntimeId subscribableFeedRuntimeId) {
- return subscribableRuntimes.get(subscribableFeedRuntimeId);
- }
-
- @Override
- public void deregisterFeedSubscribableRuntime(SubscribableFeedRuntimeId ingestionId) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("De-registered feed subscribable runtime " + ingestionId);
- }
- subscribableRuntimes.remove(ingestionId);
- }
-
- @Override
- public String toString() {
- return "IngestionManager [" + nodeId + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
deleted file mode 100644
index 5ed2876..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
+++ /dev/null
@@ -1,590 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
-import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedPolicyAccessor;
-import org.apache.asterix.common.feeds.FeedRuntimeId;
-import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
-import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.library.ExternalLibraryManager;
-import org.apache.asterix.external.provider.AdapterFactoryProvider;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.bootstrap.MetadataConstants;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedPolicy;
-import org.apache.asterix.metadata.entities.Function;
-import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.metadata.entities.SecondaryFeed;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
-import org.apache.hyracks.api.constraints.Constraint;
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
-import org.apache.hyracks.api.constraints.expressions.ConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
-import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
-import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
-import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-
-/**
- * A utility class for providing helper functions for feeds
- */
-public class FeedUtil {
-
- private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
-
- public static String getFeedPointKeyRep(Feed feed, List<String> appliedFunctions) {
- StringBuilder builder = new StringBuilder();
- builder.append(feed.getDataverseName() + ":");
- builder.append(feed.getFeedName() + ":");
- if (appliedFunctions != null && !appliedFunctions.isEmpty()) {
- for (String function : appliedFunctions) {
- builder.append(function + ":");
- }
- builder.deleteCharAt(builder.length() - 1);
- }
- return builder.toString();
- }
-
- private static class LocationConstraint {
- int partition;
- String location;
- }
-
- public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx)
- throws AsterixException {
- Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName);
- if (dataset == null) {
- throw new AsterixException("Unknown target dataset :" + datasetName);
- }
-
- if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
- throw new AsterixException("Statement not applicable. Dataset " + datasetName + " is not of required type "
- + DatasetType.INTERNAL);
- }
- return dataset;
- }
-
- public static Feed validateIfFeedExists(String dataverse, String feedName, MetadataTransactionContext ctx)
- throws MetadataException, AsterixException {
- Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverse, feedName);
- if (feed == null) {
- throw new AsterixException("Unknown source feed: " + feedName);
- }
- return feed;
- }
-
- public static FeedPolicy validateIfPolicyExists(String dataverse, String policyName, MetadataTransactionContext ctx)
- throws AsterixException {
- FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
- if (feedPolicy == null) {
- feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
- policyName);
- if (feedPolicy == null) {
- throw new AsterixException("Unknown feed policy" + policyName);
- }
- }
- return feedPolicy;
- }
-
- public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
- FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties) {
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Original Job Spec:" + spec);
- }
-
- JobSpecification altered = new JobSpecification(spec.getFrameSize());
- Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
- boolean preProcessingRequired = preProcessingRequired(feedConnectionId);
- // copy operators
- String operandId = null;
- Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
- FeedMetaOperatorDescriptor metaOp = null;
- for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
- operandId = FeedRuntimeId.DEFAULT_OPERAND_ID;
- IOperatorDescriptor opDesc = entry.getValue();
- if (opDesc instanceof FeedCollectOperatorDescriptor) {
- FeedCollectOperatorDescriptor orig = (FeedCollectOperatorDescriptor) opDesc;
- FeedCollectOperatorDescriptor fiop = new FeedCollectOperatorDescriptor(altered,
- orig.getFeedConnectionId(), orig.getSourceFeedId(), (ARecordType) orig.getOutputType(),
- orig.getRecordDescriptor(), orig.getFeedPolicyProperties(), orig.getSubscriptionLocation());
- oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
- } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
- operandId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
- metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
- FeedRuntimeType.STORE, false, operandId);
- oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
- } else if (opDesc instanceof AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) {
- operandId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) opDesc).getIndexName();
- metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
- FeedRuntimeType.STORE, false, operandId);
- oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
-
- } else {
- FeedRuntimeType runtimeType = null;
- boolean enableSubscriptionMode = false;
- boolean createMetaOp = true;
- OperatorDescriptorId opId = null;
- if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
- IPushRuntimeFactory runtimeFactory = ((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
- .getRuntimeFactories()[0];
- if (runtimeFactory instanceof AssignRuntimeFactory) {
- IConnectorDescriptor connectorDesc = spec.getOperatorInputMap().get(opDesc.getOperatorId())
- .get(0);
- IOperatorDescriptor sourceOp = spec.getProducer(connectorDesc);
- if (sourceOp instanceof FeedCollectOperatorDescriptor) {
- runtimeType = preProcessingRequired ? FeedRuntimeType.COMPUTE : FeedRuntimeType.OTHER;
- enableSubscriptionMode = preProcessingRequired;
- } else {
- runtimeType = FeedRuntimeType.OTHER;
- }
- } else if (runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) {
- runtimeType = FeedRuntimeType.ETS;
- } else {
- runtimeType = FeedRuntimeType.OTHER;
- }
- } else {
- if (opDesc instanceof AbstractSingleActivityOperatorDescriptor) {
- runtimeType = FeedRuntimeType.OTHER;
- } else {
- opId = altered.createOperatorDescriptorId(opDesc);
- createMetaOp = false;
- }
- }
- if (createMetaOp) {
- metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
- runtimeType, enableSubscriptionMode, operandId);
- opId = metaOp.getOperatorId();
- }
- oldNewOID.put(opDesc.getOperatorId(), opId);
- }
- }
-
- // copy connectors
- Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
- for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
- IConnectorDescriptor connDesc = entry.getValue();
- ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
- connectorMapping.put(entry.getKey(), newConnId);
- }
-
- // make connections between operators
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
- .getConnectorOperatorMap().entrySet()) {
- IConnectorDescriptor connDesc = altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
- Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
- Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
-
- IOperatorDescriptor leftOpDesc = altered.getOperatorMap()
- .get(oldNewOID.get(leftOp.getLeft().getOperatorId()));
- IOperatorDescriptor rightOpDesc = altered.getOperatorMap()
- .get(oldNewOID.get(rightOp.getLeft().getOperatorId()));
-
- altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
- }
-
- // prepare for setting partition constraints
- Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<OperatorDescriptorId, List<LocationConstraint>>();
- Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>();
-
- for (Constraint constraint : spec.getUserConstraints()) {
- LValueConstraintExpression lexpr = constraint.getLValue();
- ConstraintExpression cexpr = constraint.getRValue();
- OperatorDescriptorId opId;
- switch (lexpr.getTag()) {
- case PARTITION_COUNT:
- opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
- operatorCounts.put(opId, (int) ((ConstantExpression) cexpr).getValue());
- break;
- case PARTITION_LOCATION:
- opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-
- IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId));
- List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
- if (locations == null) {
- locations = new ArrayList<>();
- operatorLocations.put(opDesc.getOperatorId(), locations);
- }
- String location = (String) ((ConstantExpression) cexpr).getValue();
- LocationConstraint lc = new LocationConstraint();
- lc.location = location;
- lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
- locations.add(lc);
- break;
- default:
- break;
- }
- }
-
- // set absolute location constraints
- for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) {
- IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
- Collections.sort(entry.getValue(), new Comparator<LocationConstraint>() {
-
- @Override
- public int compare(LocationConstraint o1, LocationConstraint o2) {
- return o1.partition - o2.partition;
- }
- });
- String[] locations = new String[entry.getValue().size()];
- for (int i = 0; i < locations.length; ++i) {
- locations[i] = entry.getValue().get(i).location;
- }
- PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc, locations);
- }
-
- // set count constraints
- for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) {
- IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
- if (!operatorLocations.keySet().contains(entry.getKey())) {
- PartitionConstraintHelper.addPartitionCountConstraint(altered, opDesc, entry.getValue());
- }
- }
-
- // useConnectorSchedulingPolicy
- altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
-
- // connectorAssignmentPolicy
- altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
-
- // roots
- for (OperatorDescriptorId root : spec.getRoots()) {
- altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root)));
- }
-
- // jobEventListenerFactory
- altered.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("New Job Spec:" + altered);
- }
-
- return altered;
-
- }
-
- public static void increaseCardinality(JobSpecification spec, FeedRuntimeType compute, int requiredCardinality,
- List<String> newLocations) throws AsterixException {
- IOperatorDescriptor changingOpDesc = alterJobSpecForComputeCardinality(spec, requiredCardinality);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, changingOpDesc,
- nChooseK(requiredCardinality, newLocations));
-
- }
-
- public static void decreaseComputeCardinality(JobSpecification spec, FeedRuntimeType compute,
- int requiredCardinality, List<String> currentLocations) throws AsterixException {
- IOperatorDescriptor changingOpDesc = alterJobSpecForComputeCardinality(spec, requiredCardinality);
- String[] chosenLocations = nChooseK(requiredCardinality, currentLocations);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, changingOpDesc, chosenLocations);
- }
-
- private static IOperatorDescriptor alterJobSpecForComputeCardinality(JobSpecification spec, int requiredCardinality)
- throws AsterixException {
- Map<ConnectorDescriptorId, IConnectorDescriptor> connectors = spec.getConnectorMap();
- Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap = spec
- .getConnectorOperatorMap();
-
- IOperatorDescriptor sourceOp = null;
- IOperatorDescriptor targetOp = null;
- IConnectorDescriptor connDesc = null;
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : connectorOpMap
- .entrySet()) {
- ConnectorDescriptorId cid = entry.getKey();
- sourceOp = entry.getValue().getKey().getKey();
- if (sourceOp instanceof FeedCollectOperatorDescriptor) {
- targetOp = entry.getValue().getValue().getKey();
- if (targetOp instanceof FeedMetaOperatorDescriptor
- && (((FeedMetaOperatorDescriptor) targetOp).getRuntimeType().equals(FeedRuntimeType.COMPUTE))) {
- connDesc = connectors.get(cid);
- break;
- } else {
- throw new AsterixException("Incorrect manipulation, feed does not have a compute stage");
- }
- }
- }
-
- Map<OperatorDescriptorId, List<IConnectorDescriptor>> operatorInputMap = spec.getOperatorInputMap();
- boolean removed = operatorInputMap.get(targetOp.getOperatorId()).remove(connDesc);
- if (!removed) {
- throw new AsterixException("Connector desc not found");
- }
- Map<OperatorDescriptorId, List<IConnectorDescriptor>> operatorOutputMap = spec.getOperatorOutputMap();
- removed = operatorOutputMap.get(sourceOp.getOperatorId()).remove(connDesc);
- if (!removed) {
- throw new AsterixException("Connector desc not found");
- }
- spec.getConnectorMap().remove(connDesc.getConnectorId());
- connectorOpMap.remove(connDesc.getConnectorId());
-
- ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(requiredCardinality);
- MToNPartitioningConnectorDescriptor newConnector = new MToNPartitioningConnectorDescriptor(spec, tpcf);
- spec.getConnectorMap().put(newConnector.getConnectorId(), newConnector);
- spec.connect(newConnector, sourceOp, 0, targetOp, 0);
-
- // ==============================================================================
- Set<Constraint> userConstraints = spec.getUserConstraints();
- Constraint countConstraint = null;
- Constraint locationConstraint = null;
- List<LocationConstraint> locations = new ArrayList<LocationConstraint>();
- IOperatorDescriptor changingOpDesc = null;
-
- for (Constraint constraint : userConstraints) {
- LValueConstraintExpression lexpr = constraint.getLValue();
- ConstraintExpression cexpr = constraint.getRValue();
- OperatorDescriptorId opId;
- switch (lexpr.getTag()) {
- case PARTITION_COUNT: {
- opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
- IOperatorDescriptor opDesc = spec.getOperatorMap().get(opId);
- if (opDesc instanceof FeedMetaOperatorDescriptor) {
- FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor) opDesc).getRuntimeType();
- if (runtimeType.equals(FeedRuntimeType.COMPUTE)) {
- countConstraint = constraint;
- changingOpDesc = opDesc;
- }
- }
- break;
- }
- case PARTITION_LOCATION:
- opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
- IOperatorDescriptor opDesc = spec.getOperatorMap().get(opId);
- if (opDesc instanceof FeedMetaOperatorDescriptor) {
- FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor) opDesc).getRuntimeType();
- if (runtimeType.equals(FeedRuntimeType.COMPUTE)) {
- locationConstraint = constraint;
- changingOpDesc = opDesc;
- String location = (String) ((ConstantExpression) cexpr).getValue();
- LocationConstraint lc = new LocationConstraint();
- lc.location = location;
- lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
- locations.add(lc);
- }
- }
- break;
- default:
- break;
- }
- }
-
- userConstraints.remove(countConstraint);
- if (locationConstraint != null) {
- userConstraints.remove(locationConstraint);
- }
-
- return changingOpDesc;
- }
-
- private static String[] nChooseK(int k, List<String> locations) {
- String[] result = new String[k];
- for (int i = 0; i < k; i++) {
- result[i] = locations.get(i);
- }
- return result;
- }
-
- private static boolean preProcessingRequired(FeedConnectionId connectionId) {
- MetadataTransactionContext ctx = null;
- Feed feed = null;
- boolean preProcessingRequired = false;
- try {
- MetadataManager.INSTANCE.acquireReadLatch();
- ctx = MetadataManager.INSTANCE.beginTransaction();
- feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(),
- connectionId.getFeedId().getFeedName());
- preProcessingRequired = feed.getAppliedFunction() != null;
- MetadataManager.INSTANCE.commitTransaction(ctx);
- } catch (Exception e) {
- if (ctx != null) {
- try {
- MetadataManager.INSTANCE.abortTransaction(ctx);
- } catch (Exception abortException) {
- e.addSuppressed(abortException);
- throw new IllegalStateException(e);
- }
- }
- } finally {
- MetadataManager.INSTANCE.releaseReadLatch();
- }
- return preProcessingRequired;
- }
-
- public static Triple<IAdapterFactory, ARecordType, AdapterType> getPrimaryFeedFactoryAndOutput(PrimaryFeed feed,
- FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
-
- String adapterName = null;
- DatasourceAdapter adapterEntity = null;
- String adapterFactoryClassname = null;
- IAdapterFactory adapterFactory = null;
- ARecordType adapterOutputType = null;
- Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
- AdapterType adapterType = null;
- try {
- adapterName = feed.getAdaptorName();
- Map<String, String> configuration = feed.getAdaptorConfiguration();
- configuration.putAll(policyAccessor.getFeedPolicy());
- adapterOutputType = getOutputType(feed, configuration);
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
- adapterName);
- if (adapterEntity == null) {
- adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
- }
- if (adapterEntity != null) {
- adapterType = adapterEntity.getType();
- adapterFactoryClassname = adapterEntity.getClassname();
- switch (adapterType) {
- case INTERNAL:
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- break;
- case EXTERNAL:
- String[] anameComponents = adapterName.split("#");
- String libraryName = anameComponents[0];
- ClassLoader cl = ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(),
- libraryName);
- adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
- break;
- }
- adapterFactory.configure(configuration, adapterOutputType);
- } else {
- configuration.put(ExternalDataConstants.KEY_DATAVERSE, feed.getDataverseName());
- adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration,
- adapterOutputType);
- adapterType = AdapterType.INTERNAL;
- }
- feedProps = new Triple<IAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
- adapterType);
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to create adapter " + e);
- }
- return feedProps;
- }
-
- private static ARecordType getOutputType(PrimaryFeed feed, Map<String, String> configuration) throws Exception {
- ARecordType outputType = null;
- String fqOutputType = configuration.get(ExternalDataConstants.KEY_TYPE_NAME);
-
- if (fqOutputType == null) {
- throw new IllegalArgumentException("No output type specified");
- }
- String[] dataverseAndType = fqOutputType.split("[.]");
- String dataverseName;
- String datatypeName;
-
- if (dataverseAndType.length == 1) {
- datatypeName = dataverseAndType[0];
- dataverseName = feed.getDataverseName();
- } else if (dataverseAndType.length == 2) {
- dataverseName = dataverseAndType[0];
- datatypeName = dataverseAndType[1];
- } else
- throw new IllegalArgumentException(
- "Invalid value for the parameter " + ExternalDataConstants.KEY_TYPE_NAME);
-
- MetadataTransactionContext ctx = null;
- MetadataManager.INSTANCE.acquireReadLatch();
- try {
- ctx = MetadataManager.INSTANCE.beginTransaction();
- Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverseName, datatypeName);
- IAType type = t.getDatatype();
- if (type.getTypeTag() != ATypeTag.RECORD) {
- throw new IllegalStateException();
- }
- outputType = (ARecordType) t.getDatatype();
- MetadataManager.INSTANCE.commitTransaction(ctx);
- } catch (Exception e) {
- if (ctx != null) {
- MetadataManager.INSTANCE.abortTransaction(ctx);
- }
- throw e;
- } finally {
- MetadataManager.INSTANCE.releaseReadLatch();
- }
- return outputType;
- }
-
- public static String getSecondaryFeedOutput(SecondaryFeed feed, FeedPolicyAccessor policyAccessor,
- MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
- String outputType = null;
- String primaryFeedName = feed.getSourceFeedName();
- Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);
- FunctionSignature appliedFunction = primaryFeed.getAppliedFunction();
- if (appliedFunction == null) {
- Triple<IAdapterFactory, ARecordType, AdapterType> result = getPrimaryFeedFactoryAndOutput(
- (PrimaryFeed) primaryFeed, policyAccessor, mdTxnCtx);
- outputType = result.second.getTypeName();
- } else {
- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
- if (function != null) {
- if (function.getLanguage().equals(Function.LANGUAGE_AQL)) {
- throw new NotImplementedException(
- "Secondary feeds derived from a source feed that has an applied AQL function are not supported yet.");
- } else {
- outputType = function.getReturnType();
- }
- } else {
- throw new IllegalArgumentException(
- "Function " + appliedFunction + " associated with source feed not found in Metadata.");
- }
- }
- return outputType;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedWorkManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedWorkManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedWorkManager.java
deleted file mode 100644
index be12ff0..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedWorkManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.asterix.common.feeds.api.IFeedWork;
-import org.apache.asterix.common.feeds.api.IFeedWorkEventListener;
-import org.apache.asterix.common.feeds.api.IFeedWorkManager;
-
-/**
- * Handles asynchronous execution of feed management related tasks.
- */
-public class FeedWorkManager implements IFeedWorkManager {
-
- public static final FeedWorkManager INSTANCE = new FeedWorkManager();
-
- private final ExecutorService executorService = Executors.newCachedThreadPool();
-
- private FeedWorkManager() {
- }
-
- public void submitWork(IFeedWork work, IFeedWorkEventListener listener) {
- Runnable runnable = work.getRunnable();
- try {
- executorService.execute(runnable);
- listener.workCompleted(work);
- } catch (Exception e) {
- listener.workFailed(work, e);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IAdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IAdapterExecutor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IAdapterExecutor.java
deleted file mode 100644
index ff641af..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IAdapterExecutor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-
-public interface IAdapterExecutor {
-
- /**
- * @throws Exception
- */
- public void start() throws Exception;
-
- /**
- * @throws Exception
- */
- public void stop() throws Exception;
-
- /**
- * @return
- */
- public FeedConnectionId getFeedId();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IFeedMessage.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IFeedMessage.java
deleted file mode 100644
index 9180671..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IFeedMessage.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.io.Serializable;
-
-public interface IFeedMessage extends Serializable {
-
- public enum MessageType {
- END,
- SUPER_FEED_MANAGER_ELECT
- }
-
- public MessageType getMessageType();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ITypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ITypedAdapterFactory.java
deleted file mode 100644
index f35c21f..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ITypedAdapterFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.util.Map;
-
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.om.types.ARecordType;
-
-public interface ITypedAdapterFactory extends IAdapterFactory {
-
- public ARecordType getAdapterOutputType();
-
- public void configure(Map<String, String> configuration) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/MessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/MessageListener.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/MessageListener.java
deleted file mode 100644
index 650cb92..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/MessageListener.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class MessageListener {
-
- private static final Logger LOGGER = Logger.getLogger(MessageListener.class.getName());
-
- private final int port;
- private final LinkedBlockingQueue<String> outbox;
-
- private ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- private MessageListenerServer listenerServer;
-
- public MessageListener(int port, LinkedBlockingQueue<String> outbox) {
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- listenerServer.stop();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Stopped message service at " + port);
- }
- if (!executorService.isShutdown()) {
- executorService.shutdownNow();
- }
-
- }
-
- public void start() throws IOException {
- listenerServer = new MessageListenerServer(port, outbox);
- executorService.execute(listenerServer);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting message service at " + port);
- }
- }
-
- private static class MessageListenerServer implements Runnable {
-
- private final int port;
- private final LinkedBlockingQueue<String> outbox;
- private ServerSocket server;
-
- public MessageListenerServer(int port, LinkedBlockingQueue<String> outbox) {
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- try {
- server.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void run() {
- char EOL = (char) "\n".getBytes()[0];
- Socket client = null;
- try {
- server = new ServerSocket(port);
- client = server.accept();
- InputStream in = client.getInputStream();
- CharBuffer buffer = CharBuffer.allocate(5000);
- char ch;
- while (true) {
- ch = (char) in.read();
- if (((int) ch) == -1) {
- break;
- }
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- String s = new String(buffer.array());
- synchronized (outbox) {
- outbox.add(s + "\n");
- }
- buffer.position(0);
- buffer.limit(5000);
- }
-
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start Message listener" + server);
- }
- } finally {
- if (server != null) {
- try {
- server.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- }
-
- }
-
- public static interface IMessageAnalyzer {
-
- public LinkedBlockingQueue<String> getMessageQueue();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/PrepareStallMessage.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/PrepareStallMessage.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/PrepareStallMessage.java
deleted file mode 100644
index 3ca1147..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/PrepareStallMessage.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.asterix.common.feeds.FeedConnectionId;
-import org.apache.asterix.common.feeds.FeedConstants;
-import org.apache.asterix.common.feeds.message.FeedMessage;
-
-/**
- * A feed control message indicating the need to end the feed. This message is dispatched
- * to all locations that host an operator involved in the feed pipeline.
- */
-public class PrepareStallMessage extends FeedMessage {
-
- private static final long serialVersionUID = 1L;
-
- private final FeedConnectionId connectionId;
-
- private final int computePartitionsRetainLimit;
-
- public PrepareStallMessage(FeedConnectionId connectionId, int computePartitionsRetainLimit) {
- super(MessageType.PREPARE_STALL);
- this.connectionId = connectionId;
- this.computePartitionsRetainLimit = computePartitionsRetainLimit;
- }
-
- @Override
- public String toString() {
- return MessageType.PREPARE_STALL.name() + " " + connectionId;
- }
-
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject obj = new JSONObject();
- obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
- obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
- obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
- obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
- obj.put(FeedConstants.MessageConstants.COMPUTE_PARTITION_RETAIN_LIMIT, computePartitionsRetainLimit);
- return obj;
- }
-
- public FeedConnectionId getConnectionId() {
- return connectionId;
- }
-
- public int getComputePartitionsRetainLimit() {
- return computePartitionsRetainLimit;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e800e6d5/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/RemoteSocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/RemoteSocketMessageListener.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/RemoteSocketMessageListener.java
deleted file mode 100644
index 5c5c068..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/RemoteSocketMessageListener.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.feeds;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.nio.CharBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public class RemoteSocketMessageListener {
-
- private static final Logger LOGGER = Logger.getLogger(RemoteSocketMessageListener.class.getName());
-
- private final String host;
- private final int port;
- private final LinkedBlockingQueue<String> outbox;
- private final ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- private RemoteMessageListenerServer listenerServer;
-
- public RemoteSocketMessageListener(String host, int port, LinkedBlockingQueue<String> outbox) {
- this.host = host;
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- if (!executorService.isShutdown()) {
- executorService.shutdownNow();
- }
- listenerServer.stop();
-
- }
-
- public void start() throws IOException {
- listenerServer = new RemoteMessageListenerServer(host, port, outbox);
- executorService.execute(listenerServer);
- }
-
- private static class RemoteMessageListenerServer implements Runnable {
-
- private final String host;
- private final int port;
- private final LinkedBlockingQueue<String> outbox;
- private Socket client;
-
- public RemoteMessageListenerServer(String host, int port, LinkedBlockingQueue<String> outbox) {
- this.host = host;
- this.port = port;
- this.outbox = outbox;
- }
-
- public void stop() {
- try {
- client.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void run() {
- char EOL = (char) "\n".getBytes()[0];
- Socket client = null;
- try {
- client = new Socket(host, port);
- InputStream in = client.getInputStream();
- CharBuffer buffer = CharBuffer.allocate(5000);
- char ch;
- while (true) {
- ch = (char) in.read();
- if ((ch) == -1) {
- break;
- }
- while (ch != EOL) {
- buffer.put(ch);
- ch = (char) in.read();
- }
- buffer.flip();
- String s = new String(buffer.array());
- synchronized (outbox) {
- outbox.add(s + "\n");
- }
- buffer.position(0);
- buffer.limit(5000);
- }
-
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to start Remote Message listener" + client);
- }
- } finally {
- if (client != null && !client.isClosed()) {
- try {
- client.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
-
- public static interface IMessageAnalyzer {
-
- /**
- * @return
- */
- public LinkedBlockingQueue<String> getMessageQueue();
-
- }
-
-}