You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:00 UTC
[03/24] incubator-asterixdb git commit: Introduces Feeds 2.0
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index c7b905d..ebee738 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -21,35 +21,47 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
+import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.FeedRuntimeId;
+import edu.uci.ics.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Feed;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
@@ -61,7 +73,11 @@ import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
/**
* A utility class for providing helper functions for feeds
@@ -70,9 +86,17 @@ public class FeedUtil {
private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
- public static boolean isFeedActive(FeedActivity feedActivity) {
- return (feedActivity != null && !(feedActivity.getActivityType().equals(FeedActivityType.FEED_FAILURE) || feedActivity
- .getActivityType().equals(FeedActivityType.FEED_END)));
+ public static String getFeedPointKeyRep(Feed feed, List<String> appliedFunctions) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(feed.getDataverseName() + ":");
+ builder.append(feed.getFeedName() + ":");
+ if (appliedFunctions != null && !appliedFunctions.isEmpty()) {
+ for (String function : appliedFunctions) {
+ builder.append(function + ":");
+ }
+ builder.deleteCharAt(builder.length() - 1);
+ }
+ return builder.toString();
}
private static class LocationConstraint {
@@ -80,62 +104,113 @@ public class FeedUtil {
String location;
}
+ public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx)
+ throws AsterixException {
+ Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Unknown target dataset :" + datasetName);
+ }
+
+ if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
+ throw new AsterixException("Statement not applicable. Dataset " + datasetName + " is not of required type "
+ + DatasetType.INTERNAL);
+ }
+ return dataset;
+ }
+
+ public static Feed validateIfFeedExists(String dataverse, String feedName, MetadataTransactionContext ctx)
+ throws MetadataException, AsterixException {
+ Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverse, feedName);
+ if (feed == null) {
+ throw new AsterixException("Unknown source feed: " + feedName);
+ }
+ return feed;
+ }
+
+ public static FeedPolicy validateIfPolicyExists(String dataverse, String policyName, MetadataTransactionContext ctx)
+ throws AsterixException {
+ FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
+ if (feedPolicy == null) {
+ feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
+ policyName);
+ if (feedPolicy == null) {
+ throw new AsterixException("Unknown feed policy" + policyName);
+ }
+ }
+ return feedPolicy;
+ }
+
public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
- FeedConnectionId feedConnectionId, FeedPolicy feedPolicy) {
+ FeedConnectionId feedConnectionId, Map<String, String> feedPolicyProperties) {
- FeedPolicyAccessor fpa = new FeedPolicyAccessor(feedPolicy.getProperties());
- boolean alterationRequired = (fpa.collectStatistics() || fpa.continueOnApplicationFailure()
- || fpa.continueOnHardwareFailure() || fpa.isElastic());
- if (!alterationRequired) {
- return spec;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Original Job Spec:" + spec);
}
JobSpecification altered = new JobSpecification(spec.getFrameSize());
Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
-
+ boolean preProcessingRequired = preProcessingRequired(feedConnectionId);
// copy operators
- String operationId = null;
+ String operandId = null;
Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
+ FeedMetaOperatorDescriptor metaOp = null;
for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
- operationId = FeedRuntime.FeedRuntimeId.DEFAULT_OPERATION_ID;
+ operandId = FeedRuntimeId.DEFAULT_OPERAND_ID;
IOperatorDescriptor opDesc = entry.getValue();
- if (opDesc instanceof FeedIntakeOperatorDescriptor) {
- FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
- FeedIntakeOperatorDescriptor fiop;
- if (orig.getAdapterFactory() != null) {
- fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterFactory(),
- (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
- } else {
- fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterLibraryName(),
- orig.getAdapterFactoryClassName(), orig.getAdapterConfiguration(),
- (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
- }
+ if (opDesc instanceof FeedCollectOperatorDescriptor) {
+ FeedCollectOperatorDescriptor orig = (FeedCollectOperatorDescriptor) opDesc;
+ FeedCollectOperatorDescriptor fiop = new FeedCollectOperatorDescriptor(altered,
+ orig.getFeedConnectionId(), orig.getSourceFeedId(), (ARecordType) orig.getOutputType(),
+ orig.getRecordDescriptor(), orig.getFeedPolicyProperties(), orig.getSubscriptionLocation());
oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
} else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
- operationId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
- FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
- feedPolicy, FeedRuntimeType.STORAGE, operationId);
+ operandId = ((AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+ metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+ FeedRuntimeType.STORE, false, operandId);
oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
} else if (opDesc instanceof AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) {
- operationId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) opDesc).getIndexName();
- FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
- feedPolicy, FeedRuntimeType.STORAGE, operationId);
+ operandId = ((AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor) opDesc).getIndexName();
+ metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+ FeedRuntimeType.STORE, false, operandId);
oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+
} else {
FeedRuntimeType runtimeType = null;
+ boolean enableSubscriptionMode = false;
+ boolean createMetaOp = true;
+ OperatorDescriptorId opId = null;
if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
IPushRuntimeFactory runtimeFactory = ((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
.getRuntimeFactories()[0];
if (runtimeFactory instanceof AssignRuntimeFactory) {
- runtimeType = FeedRuntimeType.COMPUTE;
- } else if (runtimeFactory instanceof StreamProjectRuntimeFactory) {
- runtimeType = FeedRuntimeType.COMMIT;
+ IConnectorDescriptor connectorDesc = spec.getOperatorInputMap().get(opDesc.getOperatorId())
+ .get(0);
+ IOperatorDescriptor sourceOp = spec.getProducer(connectorDesc);
+ if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+ runtimeType = preProcessingRequired ? FeedRuntimeType.COMPUTE : FeedRuntimeType.OTHER;
+ enableSubscriptionMode = preProcessingRequired;
+ } else {
+ runtimeType = FeedRuntimeType.OTHER;
+ }
+ } else if (runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) {
+ runtimeType = FeedRuntimeType.ETS;
+ } else {
+ runtimeType = FeedRuntimeType.OTHER;
+ }
+ } else {
+ if (opDesc instanceof AbstractSingleActivityOperatorDescriptor) {
+ runtimeType = FeedRuntimeType.OTHER;
+ } else {
+ opId = altered.createOperatorDescriptorId(opDesc);
+ createMetaOp = false;
}
}
- FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
- feedPolicy, runtimeType, operationId);
-
- oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+ if (createMetaOp) {
+ metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc, feedPolicyProperties,
+ runtimeType, enableSubscriptionMode, operandId);
+ opId = metaOp.getOperatorId();
+ }
+ oldNewOID.put(opDesc.getOperatorId(), opId);
}
}
@@ -240,35 +315,181 @@ public class FeedUtil {
}
- public static Triple<IAdapterFactory, ARecordType, AdapterType> getFeedFactoryAndOutput(Feed feed,
- MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+ public static void increaseCardinality(JobSpecification spec, FeedRuntimeType compute, int requiredCardinality,
+ List<String> newLocations) throws AsterixException {
+ IOperatorDescriptor changingOpDesc = alterJobSpecForComputeCardinality(spec, requiredCardinality);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, changingOpDesc,
+ nChooseK(requiredCardinality, newLocations));
+
+ }
+
+ public static void decreaseComputeCardinality(JobSpecification spec, FeedRuntimeType compute,
+ int requiredCardinality, List<String> currentLocations) throws AsterixException {
+ IOperatorDescriptor changingOpDesc = alterJobSpecForComputeCardinality(spec, requiredCardinality);
+ String[] chosenLocations = nChooseK(requiredCardinality, currentLocations);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, changingOpDesc, chosenLocations);
+ }
+
+ private static IOperatorDescriptor alterJobSpecForComputeCardinality(JobSpecification spec, int requiredCardinality)
+ throws AsterixException {
+ Map<ConnectorDescriptorId, IConnectorDescriptor> connectors = spec.getConnectorMap();
+ Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap = spec
+ .getConnectorOperatorMap();
+
+ IOperatorDescriptor sourceOp = null;
+ IOperatorDescriptor targetOp = null;
+ IConnectorDescriptor connDesc = null;
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : connectorOpMap
+ .entrySet()) {
+ ConnectorDescriptorId cid = entry.getKey();
+ sourceOp = entry.getValue().getKey().getKey();
+ if (sourceOp instanceof FeedCollectOperatorDescriptor) {
+ targetOp = entry.getValue().getValue().getKey();
+ if (targetOp instanceof FeedMetaOperatorDescriptor
+ && (((FeedMetaOperatorDescriptor) targetOp).getRuntimeType().equals(FeedRuntimeType.COMPUTE))) {
+ connDesc = connectors.get(cid);
+ break;
+ } else {
+ throw new AsterixException("Incorrect manipulation, feed does not have a compute stage");
+ }
+ }
+ }
+
+ Map<OperatorDescriptorId, List<IConnectorDescriptor>> operatorInputMap = spec.getOperatorInputMap();
+ boolean removed = operatorInputMap.get(targetOp.getOperatorId()).remove(connDesc);
+ if (!removed) {
+ throw new AsterixException("Connector desc not found");
+ }
+ Map<OperatorDescriptorId, List<IConnectorDescriptor>> operatorOutputMap = spec.getOperatorOutputMap();
+ removed = operatorOutputMap.get(sourceOp.getOperatorId()).remove(connDesc);
+ if (!removed) {
+ throw new AsterixException("Connector desc not found");
+ }
+ spec.getConnectorMap().remove(connDesc.getConnectorId());
+ connectorOpMap.remove(connDesc.getConnectorId());
+
+ ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(requiredCardinality);
+ MToNPartitioningConnectorDescriptor newConnector = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+ spec.getConnectorMap().put(newConnector.getConnectorId(), newConnector);
+ spec.connect(newConnector, sourceOp, 0, targetOp, 0);
+
+ // ==============================================================================
+ Set<Constraint> userConstraints = spec.getUserConstraints();
+ Constraint countConstraint = null;
+ Constraint locationConstraint = null;
+ List<LocationConstraint> locations = new ArrayList<LocationConstraint>();
+ IOperatorDescriptor changingOpDesc = null;
+
+ for (Constraint constraint : userConstraints) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ OperatorDescriptorId opId;
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT: {
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ IOperatorDescriptor opDesc = spec.getOperatorMap().get(opId);
+ if (opDesc instanceof FeedMetaOperatorDescriptor) {
+ FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor) opDesc).getRuntimeType();
+ if (runtimeType.equals(FeedRuntimeType.COMPUTE)) {
+ countConstraint = constraint;
+ changingOpDesc = opDesc;
+ }
+ }
+ break;
+ }
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+ IOperatorDescriptor opDesc = spec.getOperatorMap().get(opId);
+ if (opDesc instanceof FeedMetaOperatorDescriptor) {
+ FeedRuntimeType runtimeType = ((FeedMetaOperatorDescriptor) opDesc).getRuntimeType();
+ if (runtimeType.equals(FeedRuntimeType.COMPUTE)) {
+ locationConstraint = constraint;
+ changingOpDesc = opDesc;
+ String location = (String) ((ConstantExpression) cexpr).getValue();
+ LocationConstraint lc = new LocationConstraint();
+ lc.location = location;
+ lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
+ locations.add(lc);
+ }
+ }
+
+ break;
+ }
+ }
+
+ userConstraints.remove(countConstraint);
+ if (locationConstraint != null) {
+ userConstraints.remove(locationConstraint);
+ }
+
+ return changingOpDesc;
+ }
+
+ private static String[] nChooseK(int k, List<String> locations) {
+ String[] result = new String[k];
+ for (int i = 0; i < k; i++) {
+ result[i] = locations.get(i);
+ }
+ return result;
+ }
+
+ private static boolean preProcessingRequired(FeedConnectionId connectionId) {
+ MetadataTransactionContext ctx = null;
+ Feed feed = null;
+ boolean preProcessingRequired = false;
+ try {
+ MetadataManager.INSTANCE.acquireReadLatch();
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ feed = MetadataManager.INSTANCE.getFeed(ctx, connectionId.getFeedId().getDataverse(), connectionId
+ .getFeedId().getFeedName());
+ preProcessingRequired = feed.getAppliedFunction() != null;
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (ctx != null) {
+ try {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ } catch (Exception abortException) {
+ e.addSuppressed(abortException);
+ throw new IllegalStateException(e);
+ }
+ }
+ } finally {
+ MetadataManager.INSTANCE.releaseReadLatch();
+ }
+ return preProcessingRequired;
+ }
+
+ public static Triple<IFeedAdapterFactory, ARecordType, AdapterType> getPrimaryFeedFactoryAndOutput(
+ PrimaryFeed feed, FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx)
+ throws AlgebricksException {
String adapterName = null;
DatasourceAdapter adapterEntity = null;
String adapterFactoryClassname = null;
- IAdapterFactory adapterFactory = null;
+ IFeedAdapterFactory adapterFactory = null;
ARecordType adapterOutputType = null;
- Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
+ Triple<IFeedAdapterFactory, ARecordType, AdapterType> feedProps = null;
+ AdapterType adapterType = null;
try {
- adapterName = feed.getAdapterName();
+ adapterName = feed.getAdaptorName();
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
adapterName);
if (adapterEntity == null) {
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
}
-
if (adapterEntity != null) {
+ adapterType = adapterEntity.getType();
adapterFactoryClassname = adapterEntity.getClassname();
- switch (adapterEntity.getType()) {
+ switch (adapterType) {
case INTERNAL:
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ adapterFactory = (IFeedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
break;
case EXTERNAL:
String[] anameComponents = adapterName.split("#");
String libraryName = anameComponents[0];
ClassLoader cl = ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(),
libraryName);
- adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+ adapterFactory = (IFeedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
break;
}
} else {
@@ -276,42 +497,90 @@ public class FeedUtil {
if (adapterFactoryClassname == null) {
adapterFactoryClassname = adapterName;
}
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
- }
-
- Map<String, String> configuration = feed.getAdapterConfiguration();
-
- switch (adapterFactory.getAdapterType()) {
- case TYPED:
- ((ITypedAdapterFactory) adapterFactory).configure(configuration);
- adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
- break;
- case GENERIC:
- String outputTypeName = configuration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
- if (outputTypeName == null) {
- throw new IllegalArgumentException(
- "You must specify the datatype associated with the incoming data. Datatype is specified by the "
- + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
- }
- Datatype datatype = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, feed.getDataverseName(),
- outputTypeName);
- if (datatype == null) {
- throw new Exception("no datatype \"" + outputTypeName + "\" in dataverse \""
- + feed.getDataverseName() + "\"");
- }
- adapterOutputType = (ARecordType) datatype.getDatatype();
- ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
- break;
- default:
- throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
+ adapterFactory = (IFeedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ adapterType = AdapterType.INTERNAL;
}
- feedProps = new Triple<IAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
- adapterEntity.getType());
+ Map<String, String> configuration = feed.getAdaptorConfiguration();
+ configuration.putAll(policyAccessor.getFeedPolicy());
+ adapterOutputType = getOutputType(feed, configuration);
+ adapterFactory.configure(configuration, adapterOutputType);
+ feedProps = new Triple<IFeedAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
+ adapterType);
} catch (Exception e) {
e.printStackTrace();
- throw new AlgebricksException("unable to create adapter " + e);
+ throw new AlgebricksException("unable to create adapter " + e);
}
return feedProps;
}
+
+ private static ARecordType getOutputType(PrimaryFeed feed, Map<String, String> configuration) throws Exception {
+ ARecordType outputType = null;
+ String fqOutputType = configuration.get(IAdapterFactory.KEY_TYPE_NAME);
+
+ if (fqOutputType == null) {
+ throw new IllegalArgumentException("No output type specified");
+ }
+ String[] dataverseAndType = fqOutputType.split("[.]");
+ String dataverseName;
+ String datatypeName;
+
+ if (dataverseAndType.length == 1) {
+ datatypeName = dataverseAndType[0];
+ dataverseName = feed.getDataverseName();
+ } else if (dataverseAndType.length == 2) {
+ dataverseName = dataverseAndType[0];
+ datatypeName = dataverseAndType[1];
+ } else
+ throw new IllegalArgumentException("Invalid value for the parameter " + IAdapterFactory.KEY_TYPE_NAME);
+
+ MetadataTransactionContext ctx = null;
+ MetadataManager.INSTANCE.acquireReadLatch();
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverseName, datatypeName);
+ IAType type = t.getDatatype();
+ if (type.getTypeTag() != ATypeTag.RECORD) {
+ throw new IllegalStateException();
+ }
+ outputType = (ARecordType) t.getDatatype();
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (ctx != null) {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ }
+ throw e;
+ } finally {
+ MetadataManager.INSTANCE.releaseReadLatch();
+ }
+ return outputType;
+ }
+
+ public static String getSecondaryFeedOutput(SecondaryFeed feed, FeedPolicyAccessor policyAccessor,
+ MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException {
+ String outputType = null;
+ String primaryFeedName = feed.getSourceFeedName();
+ Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);
+ FunctionSignature appliedFunction = primaryFeed.getAppliedFunction();
+ if (appliedFunction == null) {
+ Triple<IFeedAdapterFactory, ARecordType, AdapterType> result = getPrimaryFeedFactoryAndOutput(
+ (PrimaryFeed) primaryFeed, policyAccessor, mdTxnCtx);
+ outputType = result.second.getTypeName();
+ } else {
+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+ if (function != null) {
+ if (function.getLanguage().equals(Function.LANGUAGE_AQL)) {
+ throw new NotImplementedException(
+ "Secondary feeds derived from a source feed that has an applied AQL function are not supported yet.");
+ } else {
+ outputType = function.getReturnType();
+ }
+ } else {
+ throw new IllegalArgumentException("Function " + appliedFunction
+ + " associated with source feed not found in Metadata.");
+ }
+ }
+ return outputType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkManager.java
new file mode 100644
index 0000000..731ca56
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedWork;
+import edu.uci.ics.asterix.common.feeds.api.IFeedWorkEventListener;
+import edu.uci.ics.asterix.common.feeds.api.IFeedWorkManager;
+
+/**
+ * Handles asynchronous execution of feed management related tasks.
+ */
+public class FeedWorkManager implements IFeedWorkManager {
+
+ public static final FeedWorkManager INSTANCE = new FeedWorkManager();
+
+ private final ExecutorService executorService = Executors.newCachedThreadPool();
+
+ private FeedWorkManager() {
+ }
+
+ public void submitWork(IFeedWork work, IFeedWorkEventListener listener) {
+ Runnable runnable = work.getRunnable();
+ try {
+ executorService.execute(runnable);
+ listener.workCompleted(work);
+ } catch (Exception e) {
+ listener.workFailed(work, e);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
deleted file mode 100644
index f0a3aa4..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
- * Acts as a marker interface indicating that the implementation provides functionality
- * for creating an adapter.
- */
-public interface IAdapterFactory extends Serializable {
-
- /**
- * A 'GENERIC' adapter can be configured to return a given datatype.
- * A 'TYPED' adapter returns records with a pre-defined datatype.
- */
- public enum AdapterType {
- GENERIC,
- TYPED
- }
-
- public enum SupportedOperation {
- READ,
- WRITE,
- READ_WRITE
- }
-
- /**
- * Returns the type of adapter indicating if the adapter can be used for
- * reading from an external data source or writing to an external data
- * source or can be used for both purposes.
- *
- * @see SupportedOperation
- * @return
- */
- public SupportedOperation getSupportedOperations();
-
- /**
- * Returns the display name corresponding to the Adapter type that is created by the factory.
- *
- * @return the display name
- */
- public String getName();
-
- /**
- * Returns the type of the adapter (GENERIC or TYPED)
- *
- * @return
- */
- public AdapterType getAdapterType();
-
- /**
- * Returns a list of partition constraints. A partition constraint can be a
- * requirement to execute at a particular location or could be cardinality
- * constraints indicating the number of instances that need to run in
- * parallel. example, a IDatasourceAdapter implementation written for data
- * residing on the local file system of a node cannot run on any other node
- * and thus has a location partition constraint. The location partition
- * constraint can be expressed as a node IP address or a node controller id.
- * In the former case, the IP address is translated to a node controller id
- * running on the node with the given IP address.
- */
- public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception;
-
- /**
- * Creates an instance of IDatasourceAdapter.
- *
- * @param HyracksTaskContext
- * @param partition
- * @return An instance of IDatasourceAdapter.
- * @throws Exception
- */
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
deleted file mode 100644
index a4c5de9..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-
-/**
- * A super interface implemented by a data source adapter. An adapter can be a
- * pull based or push based. This interface provides all common APIs that need
- * to be implemented by each adapter irrespective of the the kind of
- * adapter(pull or push).
- */
-public interface IDatasourceAdapter extends Serializable {
-
- /**
- * An adapter can be used to read from an external data source and may also
- * allow writing to the external data source. This enum type indicates the
- * kind of operations supported by the adapter.
- */
-
- /**
- * Triggers the adapter to begin ingesting data from the external source.
- *
- * @param partition
- * The adapter could be running with a degree of parallelism.
- * partition corresponds to the i'th parallel instance.
- * @param writer
- * The instance of frame writer that is used by the adapter to
- * write frame to. Adapter packs the fetched bytes (from external source),
- * packs them into frames and forwards the frames to an upstream receiving
- * operator using the instance of IFrameWriter.
- * @throws Exception
- */
- public void start(int partition, IFrameWriter writer) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
deleted file mode 100644
index 55abd73..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-/**
- * Interface implemented by an adapter that can be controlled or managed by external
- * commands (stop,alter)
- */
-public interface IFeedAdapter extends IDatasourceAdapter {
-
- public enum DataExchangeMode {
- PULL,
- PUSH
- }
-
- /**
- * @return
- */
- public DataExchangeMode getDataExchangeMode();
-
- /**
- * Discontinue the ingestion of data and end the feed.
- *
- * @throws Exception
- */
- public void stop() throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapterFactory.java
new file mode 100644
index 0000000..fc6bfe7
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapterFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
+
+public interface IFeedAdapterFactory extends IAdapterFactory {
+
+ public boolean isRecordTrackingEnabled();
+
+ public IIntakeProgressTracker createIntakeProgressTracker();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
deleted file mode 100644
index 47aa8d9..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-
-public interface IGenericAdapterFactory extends IAdapterFactory {
-
- public static final String KEY_TYPE_NAME = "type-name";
-
- public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
-
- public void setFiles(List<ExternalFile> files) throws AlgebricksException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
index 50641b0..dbc16d6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
@@ -14,6 +14,8 @@ x * Copyright 2009-2013 by The Regents of the University of California
*/
package edu.uci.ics.asterix.metadata.feeds;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+
public interface IPullBasedFeedAdapter extends IFeedAdapter {
/**
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
index 6faa44b..de2086e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
@@ -16,6 +16,7 @@ package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
public interface ITypedAdapterFactory extends IAdapterFactory {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
deleted file mode 100644
index 6cfdc7f..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.metadata.feeds;
-
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.common.feeds.FeedRuntime;
-
-public class IngestionRuntime extends FeedRuntime {
-
- private AdapterRuntimeManager adapterRuntimeManager;
-
- public IngestionRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType,
- AdapterRuntimeManager adapterRuntimeManager) {
- super(feedId, partition, feedRuntimeType);
- this.adapterRuntimeManager = adapterRuntimeManager;
- }
-
- public AdapterRuntimeManager getAdapterRuntimeManager() {
- return adapterRuntimeManager;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/PrepareStallMessage.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/PrepareStallMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/PrepareStallMessage.java
new file mode 100644
index 0000000..8c6d54d
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/PrepareStallMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator involved in the feed pipeline.
+ */
+public class PrepareStallMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+
+ private final int computePartitionsRetainLimit;
+
+ public PrepareStallMessage(FeedConnectionId connectionId, int computePartitionsRetainLimit) {
+ super(MessageType.PREPARE_STALL);
+ this.connectionId = connectionId;
+ this.computePartitionsRetainLimit = computePartitionsRetainLimit;
+ }
+
+ @Override
+ public String toString() {
+ return MessageType.PREPARE_STALL.name() + " " + connectionId;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.COMPUTE_PARTITION_RETAIN_LIMIT, computePartitionsRetainLimit);
+ return obj;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public int getComputePartitionsRetainLimit() {
+ return computePartitionsRetainLimit;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
index d7e2e2f..404e14b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/RemoteSocketMessageListener.java
@@ -31,8 +31,7 @@ public class RemoteSocketMessageListener {
private final String host;
private final int port;
private final LinkedBlockingQueue<String> outbox;
-
- private ExecutorService executorService = Executors.newFixedThreadPool(10);
+ private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private RemoteMessageListenerServer listenerServer;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SocketMessageListener.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SocketMessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SocketMessageListener.java
new file mode 100644
index 0000000..515b78e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SocketMessageListener.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IMessageReceiver;
+
+/**
+ * Listens for messages at a configured port and redirects them to a
+ * an instance of {@code IMessageReceiver}.
+ * Messages may arrive in parallel from multiple senders. Each sender is handled by
+ * a respective instance of {@code ClientHandler}.
+ */
+public class SocketMessageListener {
+
+ private static final Logger LOGGER = Logger.getLogger(SocketMessageListener.class.getName());
+
+ private final int port;
+ private final IMessageReceiver<String> messageReceiver;
+ private final MessageListenerServer listenerServer;
+
+ private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ public SocketMessageListener(int port, IMessageReceiver<String> messageReceiver) {
+ this.port = port;
+ this.messageReceiver = messageReceiver;
+ this.listenerServer = new MessageListenerServer(port, messageReceiver);
+ }
+
+ public void stop() throws IOException {
+ listenerServer.stop();
+ messageReceiver.close(false);
+ if (!executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
+ }
+
+ public void start() {
+ messageReceiver.start();
+ executorService.execute(listenerServer);
+ }
+
+ private static class MessageListenerServer implements Runnable {
+
+ private final int port;
+ private final IMessageReceiver<String> messageReceiver;
+ private ServerSocket server;
+ private final Executor executor;
+
+ public MessageListenerServer(int port, IMessageReceiver<String> messageReceiver) {
+ this.port = port;
+ this.messageReceiver = messageReceiver;
+ this.executor = Executors.newCachedThreadPool();
+ }
+
+ public void stop() throws IOException {
+ server.close();
+ }
+
+ @Override
+ public void run() {
+ Socket client = null;
+ try {
+ server = new ServerSocket(port);
+ while (true) {
+ client = server.accept();
+ ClientHandler handler = new ClientHandler(client, messageReceiver);
+ executor.execute(handler);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to start Message listener" + server);
+ }
+ } finally {
+ if (server != null) {
+ try {
+ server.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private static class ClientHandler implements Runnable {
+
+ private static final char EOL = (char) "\n".getBytes()[0];
+
+ private final Socket client;
+ private final IMessageReceiver<String> messageReceiver;
+
+ public ClientHandler(Socket client, IMessageReceiver<String> messageReceiver) {
+ this.client = client;
+ this.messageReceiver = messageReceiver;
+ }
+
+ @Override
+ public void run() {
+ try {
+ InputStream in = client.getInputStream();
+ CharBuffer buffer = CharBuffer.allocate(5000);
+ char ch;
+ while (true) {
+ ch = (char) in.read();
+ if (((int) ch) == -1) {
+ break;
+ }
+ while (ch != EOL) {
+ buffer.put(ch);
+ ch = (char) in.read();
+ }
+ buffer.flip();
+ String s = new String(buffer.array(), 0, buffer.limit());
+ messageReceiver.sendMessage(s + "\n");
+ buffer.position(0);
+ buffer.limit(5000);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to process mesages from client" + client);
+ }
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/TerminateDataFlowMessage.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/TerminateDataFlowMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/TerminateDataFlowMessage.java
new file mode 100644
index 0000000..c22d2e1
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/TerminateDataFlowMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+public class TerminateDataFlowMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId connectionId;
+
+ public TerminateDataFlowMessage(FeedConnectionId connectionId) {
+ super(MessageType.TERMINATE_FLOW);
+ this.connectionId = connectionId;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ return obj;
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/XAQLFeedMessage.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/XAQLFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/XAQLFeedMessage.java
new file mode 100644
index 0000000..e36ea2b
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/XAQLFeedMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedConstants;
+import edu.uci.ics.asterix.common.feeds.message.FeedMessage;
+
+/**
+ * A feed control message indicating the need to execute a give AQL.
+ */
+public class XAQLFeedMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String aql;
+ private final FeedConnectionId connectionId;
+
+ public XAQLFeedMessage(FeedConnectionId connectionId, String aql) {
+ super(MessageType.XAQL);
+ this.connectionId = connectionId;
+ this.aql = aql;
+ }
+
+ @Override
+ public String toString() {
+ return messageType.name() + " " + connectionId + " [" + aql + "] ";
+ }
+
+ public FeedConnectionId getConnectionId() {
+ return connectionId;
+ }
+
+ public String getAql() {
+ return aql;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put(FeedConstants.MessageConstants.MESSAGE_TYPE, messageType.name());
+ obj.put(FeedConstants.MessageConstants.DATAVERSE, connectionId.getFeedId().getDataverse());
+ obj.put(FeedConstants.MessageConstants.FEED, connectionId.getFeedId().getFeedName());
+ obj.put(FeedConstants.MessageConstants.DATASET, connectionId.getDatasetName());
+ obj.put(FeedConstants.MessageConstants.AQL, aql);
+ return obj;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
index 086caf1..8165c9c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
@@ -38,8 +38,8 @@ public class MetadataBuiltinFunctions {
addMetadataBuiltinFunctions();
AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.DATASET, false);
AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.DATASET);
- AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_INGEST, false);
- AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_INGEST);
+ AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_COLLECT, false);
+ AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_COLLECT);
AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_INTERCEPT, false);
AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_INTERCEPT);
}
@@ -89,17 +89,17 @@ public class MetadataBuiltinFunctions {
}
}, true);
- AsterixBuiltinFunctions.addPrivateFunction(AsterixBuiltinFunctions.FEED_INGEST, new IResultTypeComputer() {
+ AsterixBuiltinFunctions.addPrivateFunction(AsterixBuiltinFunctions.FEED_COLLECT, new IResultTypeComputer() {
@Override
public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
IMetadataProvider<?, ?> mp) throws AlgebricksException {
AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
- if (f.getArguments().size() != 3) {
- throw new AlgebricksException("Incorrect number of arguments -> arity is 3, not "
- + f.getArguments().size());
+ if (f.getArguments().size() != AsterixBuiltinFunctions.FEED_COLLECT.getArity()) {
+ throw new AlgebricksException("Incorrect number of arguments -> arity is "
+ + AsterixBuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size());
}
- ILogicalExpression a1 = f.getArguments().get(1).getValue();
+ ILogicalExpression a1 = f.getArguments().get(5).getValue();
IAType t1 = (IAType) env.getType(a1);
if (t1.getTypeTag() == ATypeTag.ANY) {
return BuiltinType.ANY;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
index 436fe4d..e456f89 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
@@ -15,6 +15,7 @@ public class MetadataLockManager {
private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
private final ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
+ private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks;
private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
@@ -24,6 +25,7 @@ public class MetadataLockManager {
functionsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
nodeGroupsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
feedsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+ feedPolicyLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
compactionPolicyLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
dataTypeLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
}
@@ -205,6 +207,19 @@ public class MetadataLockManager {
public void releaseFeedWriteLock(String feedName) {
feedsLocks.get(feedName).writeLock().unlock();
}
+
+ public void acquireFeedPolicyWriteLock(String policyName) {
+ ReentrantReadWriteLock fLock = feedPolicyLocks.get(policyName);
+ if (fLock == null) {
+ feedPolicyLocks.putIfAbsent(policyName, new ReentrantReadWriteLock());
+ fLock = feedPolicyLocks.get(policyName);
+ }
+ fLock.writeLock().lock();
+ }
+
+ public void releaseFeedPolicyWriteLock(String policyName) {
+ feedPolicyLocks.get(policyName).writeLock().unlock();
+ }
public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
@@ -411,6 +426,16 @@ public class MetadataLockManager {
releaseFeedWriteLock(feedFullyQualifiedName);
releaseDataverseReadLock(dataverseName);
}
+
+ public void dropFeedPolicyBegin(String dataverseName, String policyName) {
+ releaseFeedWriteLock(policyName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void dropFeedPolicyEnd(String dataverseName, String policyName) {
+ releaseFeedWriteLock(policyName);
+ releaseDataverseReadLock(dataverseName);
+ }
public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) {
acquireDataverseReadLock(dataverseName);
@@ -434,6 +459,16 @@ public class MetadataLockManager {
releaseDataverseReadLock(dataverseName);
}
+ public void createFeedPolicyBegin(String dataverseName, String policyName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireFeedPolicyWriteLock(policyName);
+ }
+
+ public void createFeedPolicyEnd(String dataverseName, String policyName) {
+ releaseFeedPolicyWriteLock(policyName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
String feedFullyQualifiedName) {
acquireDataverseReadLock(dataverseName);
@@ -446,6 +481,19 @@ public class MetadataLockManager {
releaseDatasetReadLock(datasetFullyQualifiedName);
releaseDataverseReadLock(dataverseName);
}
+
+ public void subscribeFeedBegin(String dataverseName, String datasetFullyQualifiedName,
+ String feedFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDatasetReadLock(datasetFullyQualifiedName);
+ acquireFeedReadLock(feedFullyQualifiedName);
+ }
+
+ public void subscribeFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
+ releaseFeedReadLock(feedFullyQualifiedName);
+ releaseDatasetReadLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
public void compactBegin(String dataverseName, String datasetFullyQualifiedName) {
acquireDataverseReadLock(dataverseName);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index cfecd9c..fa99064 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -273,8 +273,8 @@ public class AsterixBuiltinFunctions {
"string-join", 2);
public final static FunctionIdentifier DATASET = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dataset", 1);
- public final static FunctionIdentifier FEED_INGEST = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
- "feed-ingest", 3);
+ public final static FunctionIdentifier FEED_COLLECT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+ "feed-collect", 6);
public final static FunctionIdentifier FEED_INTERCEPT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"feed-intercept", 1);
@@ -1253,14 +1253,14 @@ public class AsterixBuiltinFunctions {
static {
datasetFunctions.add(getAsterixFunctionInfo(DATASET));
- datasetFunctions.add(getAsterixFunctionInfo(FEED_INGEST));
+ datasetFunctions.add(getAsterixFunctionInfo(FEED_COLLECT));
datasetFunctions.add(getAsterixFunctionInfo(FEED_INTERCEPT));
datasetFunctions.add(getAsterixFunctionInfo(INDEX_SEARCH));
}
static {
addUnnestFun(DATASET, false);
- addUnnestFun(FEED_INGEST, false);
+ addUnnestFun(FEED_COLLECT, false);
addUnnestFun(FEED_INTERCEPT, false);
addUnnestFun(RANGE, true);
addUnnestFun(SCAN_COLLECTION, false);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java
index eb4a027..7d98846 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/pointables/ARecordPointable.java
@@ -275,6 +275,10 @@ public class ARecordPointable extends AbstractVisitablePointable {
public List<IVisitablePointable> getFieldValues() {
return fieldValues;
}
+
+ public ARecordType getInputRecordType(){
+ return inputRecType;
+ }
@Override
public <R, T> R accept(IVisitablePointableVisitor<R, T> vistor, T tag) throws AsterixException {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
index cef7937..a24f2de 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
@@ -18,6 +18,7 @@ import java.util.logging.Logger;
import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -46,6 +47,7 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
private AsterixMetadataProperties metadataProperties;
private AsterixStorageProperties storageProperties;
private AsterixTransactionProperties txnProperties;
+ private AsterixFeedProperties feedProperties;
private IHyracksClientConnection hcc;
@@ -59,6 +61,7 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
INSTANCE.metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
INSTANCE.storageProperties = new AsterixStorageProperties(propertiesAccessor);
INSTANCE.txnProperties = new AsterixTransactionProperties(propertiesAccessor);
+ INSTANCE.feedProperties = new AsterixFeedProperties(propertiesAccessor);
INSTANCE.hcc = hcc;
Logger.getLogger("edu.uci.ics").setLevel(INSTANCE.externalProperties.getLogLevel());
}
@@ -102,6 +105,11 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
return externalProperties;
}
+ @Override
+ public AsterixFeedProperties getFeedProperties() {
+ return feedProperties;
+ }
+
public IHyracksClientConnection getHcc() {
return hcc;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 8e393d7..de3b1b9 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -28,6 +28,7 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -70,12 +71,9 @@ public class AsterixClusterProperties {
}
}
- public enum State {
- ACTIVE,
- UNUSABLE
- }
+
- private State state = State.UNUSABLE;
+ private ClusterState state = ClusterState.UNUSABLE;
public synchronized void removeNCConfiguration(String nodeId) {
// state = State.UNUSABLE;
@@ -87,7 +85,7 @@ public class AsterixClusterProperties {
ncConfiguration.put(nodeId, configuration);
if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
.getNodeNames().size()) {
- state = State.ACTIVE;
+ state = ClusterState.ACTIVE;
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" Registering configuration parameters for node id " + nodeId);
@@ -129,7 +127,7 @@ public class AsterixClusterProperties {
return ncConfig.get(IO_DEVICES).split(",");
}
- public State getState() {
+ public ClusterState getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
index 3871aef..8fdd14c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -309,8 +310,8 @@ import edu.uci.ics.asterix.runtime.evaluators.functions.temporal.PrintTimeDescri
import edu.uci.ics.asterix.runtime.evaluators.functions.temporal.TimeFromDatetimeDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.temporal.TimeFromUnixTimeInMsDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.temporal.YearMonthDurationComparatorDecriptor;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
-import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import edu.uci.ics.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
import edu.uci.ics.asterix.runtime.unnestingfunctions.std.RangeDescriptor;
import edu.uci.ics.asterix.runtime.unnestingfunctions.std.ScanCollectionDescriptor;
@@ -1081,23 +1082,26 @@ public class NonTaggedDataFormat implements IDataFormat {
@Override
public ITupleParserFactory createTupleParser(ARecordType recType, boolean delimitedFormat, char delimiter,
char quote, boolean hasHeader) {
+ Map<String, String> conf = new HashMap<String, String>();
+ AsterixTupleParserFactory.InputDataFormat inputFormat = null;
if (delimitedFormat) {
- int n = recType.getFieldTypes().length;
- IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
- IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
- if (vpf == null) {
- throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
- }
- fieldParserFactories[i] = vpf;
- }
- return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter, quote, hasHeader);
+ conf.put(AsterixTupleParserFactory.KEY_FORMAT, AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT);
+ conf.put(AsterixTupleParserFactory.KEY_DELIMITER, "" + delimiter);
+ inputFormat = InputDataFormat.DELIMITED;
} else {
- return new AdmSchemafullRecordParserFactory(recType);
+ conf.put(AsterixTupleParserFactory.KEY_FORMAT, AsterixTupleParserFactory.FORMAT_ADM);
+ inputFormat = InputDataFormat.ADM;
}
+
+ if (hasHeader) {
+ conf.put(AsterixTupleParserFactory.HAS_HEADER,
+ hasHeader ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
+ }
+ conf.put(AsterixTupleParserFactory.KEY_QUOTE, "" + quote);
+ return new AsterixTupleParserFactory(conf, recType, inputFormat);
}
+
@Override
public INullWriterFactory getNullWriterFactory() {
return AqlNullWriterFactory.INSTANCE;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
index d9103ef..d42e0d9 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
@@ -20,18 +20,16 @@ import java.io.InputStream;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.comm.VSizeFrame;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
/**
- * An Abstract class implementation for ITupleParser. It provides common
+ * An abstract class implementation for ITupleParser. It provides common
* functionality involved in parsing data in an external format and packing
* frames with formed tuples.
*/
@@ -41,37 +39,34 @@ public abstract class AbstractTupleParser implements ITupleParser {
protected ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
protected DataOutput dos = tb.getDataOutput();
- protected final FrameTupleAppender appender;
protected final ARecordType recType;
protected final IHyracksTaskContext ctx;
- protected String filename;
public AbstractTupleParser(IHyracksTaskContext ctx, ARecordType recType) throws HyracksDataException {
- appender = new FrameTupleAppender(new VSizeFrame(ctx));
this.recType = recType;
this.ctx = ctx;
}
- public void setFilename(String filename) {
- this.filename = filename;
- }
-
public abstract IDataParser getDataParser();
+ public abstract ITupleForwardPolicy getTupleParserPolicy();
+
@Override
public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
IDataParser parser = getDataParser();
+ ITupleForwardPolicy policy = getTupleParserPolicy();
try {
parser.initialize(in, recType, true);
+ policy.initialize(ctx, writer);
while (true) {
tb.reset();
if (!parser.parse(tb.getDataOutput())) {
break;
}
tb.addFieldEndOffset();
- addTupleToFrame(writer);
+ policy.addTuple(tb);
}
- appender.flush(writer, true);
+ policy.close();
} catch (AsterixException ae) {
throw new HyracksDataException(ae);
} catch (IOException ioe) {
@@ -79,15 +74,4 @@ public abstract class AbstractTupleParser implements ITupleParser {
}
}
- protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.flush(writer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException("Tuple size(" + tb.getSize() + ") is greater than frame size("
- + AsterixAppContextInfo.getInstance().getCompilerProperties().getFrameSize() + ")");
- }
- }
-
- }
-
}