You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2015/06/29 21:45:21 UTC

[24/24] incubator-asterixdb git commit: Introduces Feeds 2.0

Introduces Feeds 2.0

commit c3f577861fc705d848c1641605689cadd6973bae
Merge: ebc4cae fc0c2c0
Author: ramangrover29 <ra...@gmail.com>
Date:   Fri Jun 26 13:04:05 2015 -0700

    Merge branch 'raman/feeds_2_release' of https://code.google.com/p/asterixdb-sandbox into raman/feeds_2_release

    Conflicts:
    	asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
    	asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java

commit ebc4cae21a7302869f953df1ebda601e798d12d2
Author: ramangrover29 <ra...@gmail.com>
Date:   Sat Jun 20 17:14:45 2015 -0700

    Introduces Feeds 2.0

    Some of the prominent chnages introduced are as follows
     a) Support for building a cascade network of feeds (via secondary feeds feature)
     b) Feed Management Console for tracking active feeds and associated metrics
     c) Support for elastic runtime for data ingestion
     d) Improved fault-tolerance with support for logging of failed records

    Documentation has been added at asterix-doc/src/site/markdown/feeds/

commit fc0c2c0549a6ee8b202e57607d2e110478cd57bb
Author: ramangrover29 <ra...@gmail.com>
Date:   Sat Jun 20 17:14:45 2015 -0700

    Introduces Feeds 2.0

    Some of the prominent chnages introduced are as follows
     a) Support for building a cascade network of feeds (via secondary feeds feature)
     b) Feed Management Console for tracking active feeds and associated metrics
     c) Support for elastic runtime for data ingestion
     d) Improved fault-tolerance with support for logging of failed records

    Documentation has been added at asterix-doc/src/site/markdown/feeds/

Change-Id: I498f01c591a229aaf51cec43ab20f3e5c4f072f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/297
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sj...@ucr.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/ae85a1dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/ae85a1dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/ae85a1dc

Branch: refs/heads/master
Commit: ae85a1dc862c5430095fa9f4a38735a2bfe298f6
Parents: c0c2c1b
Author: ramangrover29 <ra...@gmail.com>
Authored: Fri Jun 26 17:26:05 2015 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Sun Jun 28 15:52:09 2015 -0700

----------------------------------------------------------------------
 asterix-algebra/pom.xml                         |   57 +-
 .../physical/ExternalDataLookupPOperator.java   |    2 +-
 .../asterix/optimizer/base/AnalysisUtil.java    |    2 +-
 .../asterix/optimizer/base/RuleCollections.java |    4 +-
 ...ceRandomPartitioningFeedComputationRule.java |   98 ++
 ...IntroduceSecondaryIndexInsertDeleteRule.java |    2 +-
 .../optimizer/rules/PushFieldAccessRule.java    |    8 +-
 .../optimizer/rules/UnnestToDataScanRule.java   |   47 +-
 .../translator/AbstractAqlTranslator.java       |   19 +-
 .../AqlExpressionToPlanTranslator.java          |   36 +-
 .../AqlPlusExpressionToPlanTranslator.java      |   33 +-
 .../asterix/translator/CompiledStatements.java  |   43 +
 .../data/twitter/obamatweets_duplicate.adm      |   12 +
 .../api/common/AsterixAppRuntimeContext.java    |   16 +-
 .../asterix/api/common/FeedWorkCollection.java  |  197 +++
 .../api/http/servlet/ConnectorAPIServlet.java   |    3 +-
 .../asterix/api/http/servlet/DDLAPIServlet.java |    2 +-
 .../api/http/servlet/FeedDashboardServlet.java  |  133 --
 .../http/servlet/FeedDataProviderServlet.java   |  118 --
 .../asterix/api/http/servlet/FeedServlet.java   |  170 ++-
 .../api/http/servlet/FeedServletUtil.java       |   13 +-
 .../api/http/servlet/QueryResultAPIServlet.java |    2 -
 .../api/http/servlet/RESTAPIServlet.java        |    2 -
 .../asterix/aql/translator/AqlTranslator.java   |  616 ++++++---
 .../ics/asterix/feeds/CentralFeedManager.java   |  100 ++
 .../uci/ics/asterix/feeds/FeedCollectInfo.java  |   36 +
 .../edu/uci/ics/asterix/feeds/FeedInfo.java     |   35 +
 .../feeds/FeedJobNotificationHandler.java       |  739 +++++++++++
 .../edu/uci/ics/asterix/feeds/FeedJoint.java    |  186 +++
 .../asterix/feeds/FeedLifecycleListener.java    |  486 +++++++
 .../uci/ics/asterix/feeds/FeedLoadManager.java  |  298 +++++
 .../edu/uci/ics/asterix/feeds/FeedManager.java  |  140 +++
 .../ics/asterix/feeds/FeedMessageReceiver.java  |   74 ++
 .../ics/asterix/feeds/FeedMetadataManager.java  |   93 ++
 .../ics/asterix/feeds/FeedTrackingManager.java  |  184 +++
 .../feeds/FeedWorkRequestResponseHandler.java   |  263 ++++
 .../uci/ics/asterix/feeds/FeedsActivator.java   |   95 ++
 .../uci/ics/asterix/file/FeedOperations.java    |  221 +++-
 .../bootstrap/AsterixGlobalRecoveryManager.java |   19 +-
 .../bootstrap/CCApplicationEntryPoint.java      |   12 +-
 .../bootstrap/ClusterLifecycleListener.java     |   14 +-
 .../hyracks/bootstrap/ClusterWorkExecutor.java  |    6 +-
 .../bootstrap/ExternalLibraryBootstrap.java     |   24 +-
 .../hyracks/bootstrap/FeedBootstrap.java        |   50 +
 .../bootstrap/FeedLifecycleListener.java        | 1188 ------------------
 .../FeedWorkRequestResponseHandler.java         |  337 -----
 ...ConstantTupleSourceOperatorNodePushable.java |    1 -
 .../ics/asterix/test/runtime/ExecutionTest.java |    4 +-
 .../metadata/results/basic/meta16/meta16.1.adm  |   27 +-
 .../metadata/results/basic/meta17/meta17.1.adm  |  143 +--
 .../metadata/results/basic/meta19/meta19.1.adm  |   35 +-
 .../metadata_dataset/metadata_dataset.1.adm     |   27 +-
 .../metadata_datatype/metadata_datatype.1.adm   |  143 +--
 .../basic/metadata_index/metadata_index.1.adm   |   35 +-
 .../queries/feeds/feeds_05/feeds_05.1.ddl.aql   |    4 +-
 .../queries/feeds/feeds_06/feeds_06.1.ddl.aql   |    2 +-
 .../queries/feeds/feeds_07/feeds_07.1.ddl.aql   |   16 +-
 .../queries/feeds/feeds_08/feeds_08.1.ddl.aql   |   25 +-
 .../queries/feeds/feeds_09/feeds_09.1.ddl.aql   |   42 +
 .../feeds/feeds_09/feeds_09.2.update.aql        |   16 +
 .../queries/feeds/feeds_09/feeds_09.3.query.aql |   22 +
 .../queries/feeds/feeds_10/feeds_10.1.ddl.aql   |   30 +
 .../feeds/feeds_10/feeds_10.2.update.aql        |   15 +
 .../queries/feeds/feeds_10/feeds_10.3.query.aql |   16 +
 .../queries/feeds/feeds_11/feeds_11.1.ddl.aql   |   27 +
 .../feeds/feeds_11/feeds_11.2.update.aql        |   12 +
 .../queries/feeds/feeds_11/feeds_11.3.query.aql |   11 +
 .../queries/feeds/feeds_12/feeds_12.1.ddl.aql   |   30 +
 .../feeds/feeds_12/feeds_12.2.update.aql        |   12 +
 .../queries/feeds/feeds_12/feeds_12.3.query.aql |   11 +
 .../results/feeds/feeds_01/feeds_01.1.adm       |    2 +-
 .../results/feeds/feeds_03/feeds_03.1.adm       |    2 +-
 .../results/feeds/feeds_09/feeds_09.1.adm       |    2 +
 .../results/feeds/feeds_10/feeds_10.1.adm       |    2 +
 .../results/feeds/feeds_11/feeds_11.1.adm       |   13 +
 .../results/feeds/feeds_12/feeds_12.1.adm       |   12 +
 .../src/test/resources/runtimets/testsuite.xml  |   30 +-
 .../edu/uci/ics/asterix/aql/base/Statement.java |    6 +-
 .../aql/expression/ConnectFeedStatement.java    |   10 +-
 .../expression/CreateFeedPolicyStatement.java   |   91 ++
 .../aql/expression/CreateFeedStatement.java     |   39 +-
 .../expression/CreatePrimaryFeedStatement.java  |   54 +
 .../CreateSecondaryFeedStatement.java           |   57 +
 .../aql/expression/FeedPolicyDropStatement.java |   61 +
 .../aql/expression/SubscribeFeedStatement.java  |  207 +++
 .../aql/expression/visitor/AQLPrintVisitor.java |   13 +-
 .../visitor/IAqlExpressionVisitor.java          |   17 +-
 .../visitor/IAqlVisitorWithVoidReturn.java      |    8 +-
 .../ics/asterix/aql/rewrites/AqlRewriter.java   |   32 +-
 .../CloneAndSubstituteVariablesVisitor.java     |   33 +-
 .../asterix/aql/rewrites/InlineUdfsVisitor.java |   33 +-
 asterix-aql/src/main/javacc/AQL.jj              |   55 +-
 .../common/api/IAsterixAppRuntimeContext.java   |    2 +-
 .../common/api/IClusterEventsSubscriber.java    |   47 +
 .../common/api/IClusterManagementWork.java      |   34 +
 .../api/IClusterManagementWorkResponse.java     |   27 +
 .../common/config/AsterixFeedProperties.java    |   62 +
 .../config/IAsterixPropertiesProvider.java      |    2 +
 .../common/exceptions/FrameDataException.java   |   34 +
 .../common/feeds/BasicMonitoredBuffer.java      |   56 +
 .../asterix/common/feeds/CollectionRuntime.java |   88 ++
 .../feeds/ComputeSideMonitoredBuffer.java       |   55 +
 .../ics/asterix/common/feeds/DataBucket.java    |   84 ++
 .../asterix/common/feeds/DataBucketPool.java    |  106 ++
 .../common/feeds/DistributeFeedFrameWriter.java |  135 ++
 .../ics/asterix/common/feeds/FeedActivity.java  |  115 ++
 .../feeds/FeedCollectRuntimeInputHandler.java   |   43 +
 .../common/feeds/FeedConnectJobInfo.java        |   75 ++
 .../asterix/common/feeds/FeedConnectionId.java  |   28 +-
 .../common/feeds/FeedConnectionRequest.java     |  122 ++
 .../ics/asterix/common/feeds/FeedConstants.java |   53 +
 .../common/feeds/FeedExceptionHandler.java      |  104 ++
 .../asterix/common/feeds/FeedFrameCache.java    |  167 +++
 .../common/feeds/FeedFrameCollector.java        |  154 +++
 .../common/feeds/FeedFrameDiscarder.java        |   49 +
 .../asterix/common/feeds/FeedFrameHandlers.java |  302 +++++
 .../asterix/common/feeds/FeedFrameSpiller.java  |  172 +++
 .../common/feeds/FeedFrameTupleAccessor.java    |   92 ++
 .../ics/asterix/common/feeds/FeedFrameUtil.java |   84 ++
 .../uci/ics/asterix/common/feeds/FeedId.java    |   62 +
 .../asterix/common/feeds/FeedIntakeInfo.java    |   44 +
 .../ics/asterix/common/feeds/FeedJobInfo.java   |   68 +
 .../ics/asterix/common/feeds/FeedJointKey.java  |   75 ++
 .../asterix/common/feeds/FeedMemoryManager.java |  108 ++
 .../common/feeds/FeedMessageService.java        |  173 ++-
 .../common/feeds/FeedMetricCollector.java       |  185 +++
 .../common/feeds/FeedPolicyAccessor.java        |  164 +++
 .../ics/asterix/common/feeds/FeedReport.java    |  116 --
 .../ics/asterix/common/feeds/FeedRuntime.java   |  158 +--
 .../ics/asterix/common/feeds/FeedRuntimeId.java |   76 ++
 .../common/feeds/FeedRuntimeInputHandler.java   |  426 +++++++
 .../common/feeds/FeedRuntimeManager.java        |  197 +--
 .../asterix/common/feeds/FeedRuntimeReport.java |    5 +
 .../common/feeds/FeedStorageStatistics.java     |    9 +
 .../common/feeds/FeedTupleCommitAckMessage.java |   93 ++
 .../feeds/FeedTupleCommitResponseMessage.java   |   76 ++
 .../asterix/common/feeds/FrameCollection.java   |   97 ++
 .../asterix/common/feeds/FrameDistributor.java  |  359 ++++++
 .../common/feeds/FrameEventCallback.java        |   98 ++
 .../ics/asterix/common/feeds/IFeedManager.java  |  101 --
 .../common/feeds/IFramePostProcessor.java       |   10 +
 .../common/feeds/IFramePreprocessor.java        |    8 +
 .../asterix/common/feeds/IngestionRuntime.java  |   71 ++
 .../common/feeds/IntakePartitionStatistics.java |   27 +
 .../common/feeds/IntakeSideMonitoredBuffer.java |   56 +
 .../asterix/common/feeds/MessageListener.java   |   61 +-
 .../asterix/common/feeds/MessageReceiver.java   |  107 ++
 .../asterix/common/feeds/MonitoredBuffer.java   |  386 ++++++
 .../common/feeds/MonitoredBufferTimerTasks.java |  290 +++++
 .../uci/ics/asterix/common/feeds/NodeLoad.java  |   44 +
 .../asterix/common/feeds/NodeLoadReport.java    |   95 ++
 .../common/feeds/NodeLoadReportService.java     |  104 ++
 .../uci/ics/asterix/common/feeds/Series.java    |   26 +
 .../uci/ics/asterix/common/feeds/SeriesAvg.java |   29 +
 .../ics/asterix/common/feeds/SeriesRate.java    |   74 ++
 .../common/feeds/StorageFrameHandler.java       |  100 ++
 .../feeds/StorageSideMonitoredBuffer.java       |  189 +++
 .../common/feeds/SubscribableFeedRuntimeId.java |   48 +
 .../common/feeds/SubscribableRuntime.java       |   86 ++
 .../asterix/common/feeds/SuperFeedManager.java  |  447 -------
 .../feeds/api/IAdapterRuntimeManager.java       |   80 ++
 .../common/feeds/api/ICentralFeedManager.java   |   16 +
 .../common/feeds/api/IDatasourceAdapter.java    |   43 +
 .../common/feeds/api/IExceptionHandler.java     |   39 +
 .../asterix/common/feeds/api/IFeedAdapter.java  |   56 +
 .../feeds/api/IFeedConnectionManager.java       |   71 ++
 .../common/feeds/api/IFeedFrameHandler.java     |   35 +
 .../asterix/common/feeds/api/IFeedJoint.java    |  117 ++
 .../api/IFeedLifecycleEventSubscriber.java      |   19 +
 .../IFeedLifecycleIntakeEventSubscriber.java    |   10 +
 .../feeds/api/IFeedLifecycleListener.java       |   52 +
 .../common/feeds/api/IFeedLoadManager.java      |   42 +
 .../asterix/common/feeds/api/IFeedManager.java  |   93 ++
 .../common/feeds/api/IFeedMemoryComponent.java  |   54 +
 .../common/feeds/api/IFeedMemoryManager.java    |   54 +
 .../asterix/common/feeds/api/IFeedMessage.java  |   48 +
 .../common/feeds/api/IFeedMessageService.java   |   30 +
 .../common/feeds/api/IFeedMetadataManager.java  |   21 +
 .../common/feeds/api/IFeedMetricCollector.java  |   46 +
 .../api/IFeedOperatorOutputSideHandler.java     |   32 +
 .../asterix/common/feeds/api/IFeedProvider.java |    8 +
 .../asterix/common/feeds/api/IFeedRuntime.java  |   58 +
 .../asterix/common/feeds/api/IFeedService.java  |   22 +
 .../feeds/api/IFeedSubscriptionManager.java     |   37 +
 .../common/feeds/api/IFeedTrackingManager.java  |   11 +
 .../ics/asterix/common/feeds/api/IFeedWork.java |   24 +
 .../feeds/api/IFeedWorkEventListener.java       |   37 +
 .../common/feeds/api/IFeedWorkManager.java      |    7 +
 .../common/feeds/api/IFrameEventCallback.java   |   14 +
 .../feeds/api/IIntakeProgressTracker.java       |   11 +
 .../common/feeds/api/IMessageReceiver.java      |   24 +
 .../common/feeds/api/ISubscribableRuntime.java  |   57 +
 .../common/feeds/api/ISubscriberRuntime.java    |   26 +
 .../common/feeds/api/ISubscriptionProvider.java |   11 +
 .../feeds/api/ITupleTrackingFeedAdapter.java    |    6 +
 .../common/feeds/message/EndFeedMessage.java    |   93 ++
 .../feeds/message/FeedCongestionMessage.java    |   99 ++
 .../common/feeds/message/FeedMessage.java       |   38 +
 .../feeds/message/FeedMessageService.java       |  143 +++
 .../common/feeds/message/FeedReportMessage.java |   96 ++
 .../message/FeedTupleCommitAckMessage.java      |   95 ++
 .../message/FeedTupleCommitResponseMessage.java |   69 +
 .../common/feeds/message/NodeReportMessage.java |   65 +
 .../feeds/message/ScaleInReportMessage.java     |  110 ++
 .../feeds/message/StorageReportFeedMessage.java |  125 ++
 .../message/ThrottlingEnabledFeedMessage.java   |   82 ++
 .../common/parse/IAsterixTupleParser.java       |   11 +
 .../common/parse/ITupleForwardPolicy.java       |   30 +
 .../common/parse/ITupleParserPolicy.java        |   27 +
 asterix-doc/src/site/markdown/feeds/tutorial.md |  287 +++++
 asterix-external-data/pom.xml                   |    7 +-
 .../adapter/factory/CNNFeedAdapterFactory.java  |   33 +-
 .../adapter/factory/HDFSAdapterFactory.java     |   56 +-
 .../factory/HDFSIndexingAdapterFactory.java     |   40 +-
 .../adapter/factory/HiveAdapterFactory.java     |   31 +-
 .../factory/NCFileSystemAdapterFactory.java     |   30 +-
 .../PullBasedAzureTwitterAdapterFactory.java    |   42 +-
 .../factory/PullBasedTwitterAdapterFactory.java |   83 +-
 .../factory/PushBasedTwitterAdapterFactory.java |   69 +
 .../adapter/factory/RSSFeedAdapterFactory.java  |   43 +-
 .../factory/StreamBasedAdapterFactory.java      |  159 +--
 .../dataset/adapter/ClientBasedFeedAdapter.java |  150 +++
 .../external/dataset/adapter/FeedClient.java    |  170 +++
 .../dataset/adapter/FileSystemBasedAdapter.java |   12 +-
 .../external/dataset/adapter/HDFSAdapter.java   |    8 +-
 .../dataset/adapter/HDFSIndexingAdapter.java    |    5 +-
 .../external/dataset/adapter/IFeedClient.java   |   42 +
 .../dataset/adapter/NCFileSystemAdapter.java    |   11 +-
 .../adapter/PullBasedAzureTwitterAdapter.java   |    7 +-
 .../dataset/adapter/PullBasedFeedClient.java    |  166 ---
 .../adapter/PullBasedTwitterAdapter.java        |   31 +-
 .../adapter/PullBasedTwitterFeedClient.java     |   77 +-
 .../adapter/PushBasedTwitterAdapter.java        |   52 +
 .../adapter/PushBasedTwitterFeedClient.java     |  118 ++
 .../dataset/adapter/RSSFeedAdapter.java         |   20 +-
 .../external/dataset/adapter/RSSFeedClient.java |    8 +-
 .../dataset/adapter/StreamBasedAdapter.java     |    4 +-
 .../dataflow/HDFSIndexingParserFactory.java     |    5 +-
 .../indexing/dataflow/HDFSLookupAdapter.java    |    9 +-
 .../external/library/ExternalFunction.java      |    4 +-
 .../library/ExternalFunctionProvider.java       |   15 +-
 .../external/library/IFunctionHelper.java       |    2 +
 .../external/library/JTypeObjectFactory.java    |    8 +-
 .../external/library/JavaFunctionHelper.java    |  179 +--
 .../ics/asterix/external/library/TypeInfo.java  |   49 +
 .../external/library/java/IJListAccessor.java   |   12 +
 .../asterix/external/library/java/IJObject.java |    8 +
 .../external/library/java/IJObjectAccessor.java |   11 +
 .../external/library/java/IJRecordAccessor.java |   15 +
 .../external/library/java/JObjectAccessors.java |  571 +++++++++
 .../library/java/JObjectPointableVisitor.java   |   75 ++
 .../external/library/java/JObjectUtil.java      |   10 +-
 .../asterix/external/library/java/JObjects.java |  858 ++++++++-----
 .../ics/asterix/external/util/Datatypes.java    |   21 +
 .../asterix/external/util/TweetProcessor.java   |   72 ++
 .../ics/asterix/external/util/TwitterUtil.java  |  143 +++
 .../external/library/AddHashTagsFactory.java    |   27 +
 .../external/library/AddHashTagsFunction.java   |   76 ++
 .../library/AddHashTagsInPlaceFactory.java      |   25 +
 .../library/AddHashTagsInPlaceFunction.java     |   54 +
 .../external/library/UpperCaseFunction.java     |    3 -
 .../library/adapter/TestTypedAdapter.java       |   12 +-
 .../adapter/TestTypedAdapterFactory.java        |   41 +-
 .../src/test/resources/library_descriptor.xml   |   76 ++
 .../src/test/resources/text_functions.xml       |   59 -
 .../installer/test/AsterixFaultToleranceIT.java |   82 --
 .../in1-cluster-restart.1.ddl.aql               |   28 -
 .../in1-cluster-restart.2.update.aql            |    3 -
 .../in1-cluster-restart.3.sleep.aql             |    1 -
 .../in1-cluster-restart.4.mgx.aql               |    1 -
 .../in1-cluster-restart.5.mgx.aql               |    1 -
 .../in1-cluster-restart.6.sleep.aql             |    1 -
 .../in1-cluster-restart.7.query.aql             |   10 -
 .../IN1-cluster-restart.1.adm                   |    2 -
 .../IN1-cluster-restart.2.adm                   |    2 -
 .../integrationts/fault-tolerance/testsuite.xml |   10 -
 .../functionDataset/functionDataset.1.adm       |   14 +-
 .../integrationts/library/testsuite.xml         |    2 +-
 .../uci/ics/asterix/metadata/MetadataCache.java |   46 +-
 .../ics/asterix/metadata/MetadataManager.java   |   79 +-
 .../uci/ics/asterix/metadata/MetadataNode.java  |  202 +--
 .../metadata/MetadataTransactionContext.java    |   15 +-
 .../metadata/api/IClusterEventsSubscriber.java  |   47 -
 .../metadata/api/IClusterManagementWork.java    |   29 -
 .../asterix/metadata/api/IClusterManager.java   |    1 +
 .../asterix/metadata/api/IMetadataManager.java  |   39 +-
 .../ics/asterix/metadata/api/IMetadataNode.java |   56 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |    4 +-
 .../bootstrap/MetadataPrimaryIndexes.java       |    1 -
 .../metadata/bootstrap/MetadataRecordTypes.java |   87 +-
 .../cluster/AbstractClusterManagementWork.java  |    6 +-
 .../asterix/metadata/cluster/AddNodeWork.java   |   36 +-
 .../cluster/ClusterManagementWorkResponse.java  |    4 +-
 .../metadata/cluster/ClusterManager.java        |    2 +-
 .../cluster/IClusterManagementWorkResponse.java |   28 -
 .../metadata/cluster/RemoveNodeWork.java        |    2 +-
 .../metadata/declared/AqlDataSource.java        |   17 +-
 .../metadata/declared/AqlMetadataProvider.java  |  257 ++--
 .../asterix/metadata/declared/AqlSourceId.java  |   12 +-
 .../metadata/declared/DatasetDataSource.java    |    2 +-
 .../metadata/declared/FeedDataSource.java       |   54 +-
 .../declared/FieldExtractingAdapter.java        |    2 +-
 .../declared/FieldExtractingAdapterFactory.java |   23 +-
 .../metadata/declared/LoadableDataSource.java   |    2 +-
 .../metadata/declared/PKGeneratingAdapter.java  |    2 +-
 .../declared/PKGeneratingAdapterFactory.java    |   21 +-
 .../uci/ics/asterix/metadata/entities/Feed.java |   99 +-
 .../asterix/metadata/entities/FeedActivity.java |  175 ---
 .../asterix/metadata/entities/PrimaryFeed.java  |   76 ++
 .../metadata/entities/SecondaryFeed.java        |   60 +
 .../DatasourceAdapterTupleTranslator.java       |    4 +-
 .../FeedActivityTupleTranslator.java            |  244 ----
 .../FeedTupleTranslator.java                    |  202 ++-
 .../metadata/external/IAdapterFactory.java      |   94 ++
 .../feeds/AbstractDatasourceAdapter.java        |   18 +-
 .../feeds/AbstractFeedDatasourceAdapter.java    |   20 +
 .../asterix/metadata/feeds/AdapterExecutor.java |   56 +
 .../metadata/feeds/AdapterIdentifier.java       |   22 +-
 .../metadata/feeds/AdapterRuntimeManager.java   |  149 +--
 .../metadata/feeds/BuiltinFeedPolicies.java     |  176 ++-
 .../feeds/CollectTransformFeedFrameWriter.java  |  102 ++
 .../ConditionalPushTupleParserFactory.java      |  213 ----
 .../ExternalDataScanOperatorDescriptor.java     |    2 +
 .../feeds/FeedCollectOperatorDescriptor.java    |  166 +++
 .../feeds/FeedCollectOperatorNodePushable.java  |  204 +++
 .../metadata/feeds/FeedConnectionManager.java   |  105 ++
 .../metadata/feeds/FeedFrameTupleDecorator.java |   90 ++
 .../asterix/metadata/feeds/FeedFrameWriter.java |  385 ------
 .../feeds/FeedIntakeOperatorDescriptor.java     |  199 +--
 .../feeds/FeedIntakeOperatorNodePushable.java   |  209 +--
 .../feeds/FeedLifecycleEventSubscriber.java     |   62 +
 .../ics/asterix/metadata/feeds/FeedManager.java |  143 ---
 .../feeds/FeedMessageOperatorDescriptor.java    |    9 +-
 .../feeds/FeedMessageOperatorNodePushable.java  |  289 ++++-
 .../feeds/FeedMetaComputeNodePushable.java      |  207 +++
 .../metadata/feeds/FeedMetaNodePushable.java    |  170 +++
 .../feeds/FeedMetaOperatorDescriptor.java       |  268 +---
 .../feeds/FeedMetaStoreNodePushable.java        |  198 +++
 .../metadata/feeds/FeedPolicyAccessor.java      |  105 --
 .../metadata/feeds/FeedPolicyEnforcer.java      |   46 +-
 .../ics/asterix/metadata/feeds/FeedReport.java  |  117 --
 .../metadata/feeds/FeedSubscriptionManager.java |   72 ++
 .../ics/asterix/metadata/feeds/FeedUtil.java    |  431 +++++--
 .../asterix/metadata/feeds/FeedWorkManager.java |   46 +
 .../asterix/metadata/feeds/IAdapterFactory.java |   91 --
 .../metadata/feeds/IDatasourceAdapter.java      |   49 -
 .../asterix/metadata/feeds/IFeedAdapter.java    |   40 -
 .../metadata/feeds/IFeedAdapterFactory.java     |   26 +
 .../metadata/feeds/IGenericAdapterFactory.java  |   32 -
 .../metadata/feeds/IPullBasedFeedAdapter.java   |    2 +
 .../metadata/feeds/ITypedAdapterFactory.java    |    1 +
 .../metadata/feeds/IngestionRuntime.java        |   34 -
 .../metadata/feeds/PrepareStallMessage.java     |   66 +
 .../feeds/RemoteSocketMessageListener.java      |    3 +-
 .../metadata/feeds/SocketMessageListener.java   |  158 +++
 .../feeds/TerminateDataFlowMessage.java         |   50 +
 .../asterix/metadata/feeds/XAQLFeedMessage.java |   64 +
 .../functions/MetadataBuiltinFunctions.java     |   14 +-
 .../metadata/utils/MetadataLockManager.java     |   48 +
 .../om/functions/AsterixBuiltinFunctions.java   |    8 +-
 .../asterix/om/pointables/ARecordPointable.java |    4 +
 .../asterix/om/util/AsterixAppContextInfo.java  |    8 +
 .../om/util/AsterixClusterProperties.java       |   12 +-
 .../runtime/formats/NonTaggedDataFormat.java    |   32 +-
 .../operators/file/AbstractTupleParser.java     |   32 +-
 .../file/AdmSchemafullRecordParserFactory.java  |   41 -
 .../runtime/operators/file/AdmTupleParser.java  |   36 -
 .../file/AsterixTupleParserFactory.java         |  254 ++++
 .../file/CounterTimerTupleForwardPolicy.java    |  140 +++
 .../file/DelimitedDataTupleParser.java          |   41 -
 .../file/FrameFullTupleForwardPolicy.java       |   58 +
 .../file/NtDelimitedDataTupleParserFactory.java |   52 -
 .../file/RateContolledParserPolicy.java         |   79 ++
 .../file/RateControlledTupleForwardPolicy.java  |   70 ++
 .../tools/external/data/DataGenerator.java      |  196 +--
 .../external/data/GenericSocketFeedAdapter.java |   22 +-
 .../data/GenericSocketFeedAdapterFactory.java   |   40 +-
 .../RateControlledFileSystemBasedAdapter.java   |    9 +-
 ...ControlledFileSystemBasedAdapterFactory.java |  197 +--
 .../external/data/SocketClientAdapter.java      |   33 +-
 .../data/SocketClientAdapterFactory.java        |   72 +-
 .../tools/external/data/TweetGenerator.java     |   96 +-
 .../data/TwitterFirehoseFeedAdapter.java        |   78 +-
 .../data/TwitterFirehoseFeedAdapterFactory.java |   80 +-
 384 files changed, 21795 insertions(+), 8594 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-algebra/pom.xml b/asterix-algebra/pom.xml
index 7c138b3..f3db662 100644
--- a/asterix-algebra/pom.xml
+++ b/asterix-algebra/pom.xml
@@ -1,18 +1,14 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- ! 
- !     http://www.apache.org/licenses/LICENSE-2.0
- ! 
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California 
+	! Licensed under the Apache License, Version 2.0 (the "License"); ! you may 
+	not use this file except in compliance with the License. ! you may obtain 
+	a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0 
+	! ! Unless required by applicable law or agreed to in writing, software ! 
+	distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the 
+	License for the specific language governing permissions and ! limitations 
+	under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<parent>
 		<artifactId>asterix</artifactId>
@@ -21,14 +17,14 @@
 	</parent>
 	<artifactId>asterix-algebra</artifactId>
 
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
+	<licenses>
+		<license>
+			<name>Apache License, Version 2.0</name>
+			<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+			<distribution>repo</distribution>
+			<comments>A business-friendly OSS license</comments>
+		</license>
+	</licenses>
 
 	<build>
 		<plugins>
@@ -61,7 +57,8 @@
 		</plugins>
 		<pluginManagement>
 			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<!--This plugin's configuration is used to store Eclipse m2e settings 
+					only. It has no influence on the Maven build itself. -->
 				<plugin>
 					<groupId>org.eclipse.m2e</groupId>
 					<artifactId>lifecycle-mapping</artifactId>
@@ -121,12 +118,12 @@
 			<version>0.8.7-SNAPSHOT</version>
 			<scope>compile</scope>
 		</dependency>
-                <dependency>
-                        <groupId>edu.uci.ics.asterix</groupId>
-                        <artifactId>asterix-transactions</artifactId>
-                        <version>0.8.7-SNAPSHOT</version>
-                        <scope>compile</scope>
-                </dependency>
+		<dependency>
+			<groupId>edu.uci.ics.asterix</groupId>
+			<artifactId>asterix-transactions</artifactId>
+			<version>0.8.7-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>algebricks-compiler</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
index 4411869..dcbc70c 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
@@ -109,7 +109,7 @@ public class ExternalDataLookupPOperator extends AbstractScanPOperator {
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
             throws AlgebricksException {
-        AqlDataSource ds = new DatasetDataSource(datasetId, datasetId.getDataverseName(), datasetId.getDatasetName(),
+        AqlDataSource ds = new DatasetDataSource(datasetId, datasetId.getDataverseName(), datasetId.getDatasourceName(),
                 recordType, AqlDataSourceType.EXTERNAL_DATASET);
         IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
         AbstractScanOperator as = (AbstractScanOperator) op;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/AnalysisUtil.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/AnalysisUtil.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/AnalysisUtil.java
index 83f137e..e89ff51 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/AnalysisUtil.java
@@ -125,7 +125,7 @@ public class AnalysisUtil {
 
     public static Pair<String, String> getDatasetInfo(AbstractDataSourceOperator op) throws AlgebricksException {
         AqlSourceId srcId = (AqlSourceId) op.getDataSource().getId();
-        return new Pair<String, String>(srcId.getDataverseName(), srcId.getDatasetName());
+        return new Pair<String, String>(srcId.getDataverseName(), srcId.getDatasourceName());
     }
 
     private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<FunctionIdentifier>();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index b90442f..10ab856 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -18,6 +18,7 @@ package edu.uci.ics.asterix.optimizer.base;
 import java.util.LinkedList;
 import java.util.List;
 
+import edu.uci.ics.asterix.optimizer.rules.AddEquivalenceClassForRecordConstructorRule;
 import edu.uci.ics.asterix.optimizer.rules.AsterixInlineVariablesRule;
 import edu.uci.ics.asterix.optimizer.rules.AsterixIntroduceGroupByCombinerRule;
 import edu.uci.ics.asterix.optimizer.rules.ByNameToByIndexFieldAccessRule;
@@ -32,7 +33,6 @@ import edu.uci.ics.asterix.optimizer.rules.ExtractOrderExpressionsRule;
 import edu.uci.ics.asterix.optimizer.rules.FeedScanCollectionToUnnest;
 import edu.uci.ics.asterix.optimizer.rules.FuzzyEqRule;
 import edu.uci.ics.asterix.optimizer.rules.IfElseToSwitchCaseFunctionRule;
-import edu.uci.ics.asterix.optimizer.rules.AddEquivalenceClassForRecordConstructorRule;
 import edu.uci.ics.asterix.optimizer.rules.InlineUnnestFunctionRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceAutogenerateIDRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceDynamicTypeCastForExternalFunctionRule;
@@ -40,6 +40,7 @@ import edu.uci.ics.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceMaterializationForInsertWithSelfScanRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceRandomPartitioningFeedComputationRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectAssignRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceStaticTypeCastForInsertRule;
@@ -310,6 +311,7 @@ public final class RuleCollections {
         physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesTopLevel.add(new IntroduceRapidFrameFlushProjectAssignRule());
         physicalRewritesTopLevel.add(new SetExecutionModeRule());
+        physicalRewritesTopLevel.add(new IntroduceRandomPartitioningFeedComputationRule());
         return physicalRewritesTopLevel;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
new file mode 100644
index 0000000..c0ccd30
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import edu.uci.ics.asterix.metadata.declared.FeedDataSource;
+import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (!op.getOperatorTag().equals(LogicalOperatorTag.ASSIGN)) {
+            return false;
+        }
+
+        ILogicalOperator opChild = op.getInputs().get(0).getValue();
+        if (!opChild.getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN)) {
+            return false;
+        }
+
+        DataSourceScanOperator scanOp = (DataSourceScanOperator) opChild;
+        AqlDataSource dataSource = (AqlDataSource) scanOp.getDataSource();
+        if (!dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) {
+            return false;
+        }
+
+        final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
+        Feed feed = feedDataSource.getFeed();
+        if (feed.getAppliedFunction() == null) {
+            return false;
+        }
+
+        ExchangeOperator exchangeOp = new ExchangeOperator();
+        INodeDomain domain = new INodeDomain() {
+            @Override
+            public boolean sameAs(INodeDomain domain) {
+                return domain == this;
+            }
+
+            @Override
+            public Integer cardinality() {
+                return feedDataSource.getComputeCardinality();
+            }
+        };
+
+        exchangeOp.setPhysicalOperator(new RandomPartitionPOperator(domain));
+        op.getInputs().get(0).setValue(exchangeOp);
+        exchangeOp.getInputs().add(new MutableObject<ILogicalOperator>(scanOp));
+        ExecutionMode em = ((AbstractLogicalOperator) scanOp).getExecutionMode();
+        exchangeOp.setExecutionMode(em);
+        exchangeOp.computeDeliveredPhysicalProperties(context);
+        context.computeAndSetTypeEnvironmentForOperator(exchangeOp);
+
+        AssignOperator assignOp = (AssignOperator) opRef.getValue();
+        AssignPOperator assignPhyOp = (AssignPOperator) assignOp.getPhysicalOperator();
+        assignPhyOp.setCardinalityConstraint(domain.cardinality());
+
+        return true;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 8552c39..84f7230 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -136,7 +136,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
         AqlDataSource datasetSource = (AqlDataSource) insertOp.getDataSource();
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         String dataverseName = datasetSource.getId().getDataverseName();
-        String datasetName = datasetSource.getId().getDatasetName();
+        String datasetName = datasetSource.getId().getDatasourceName();
         Dataset dataset = mp.findDataset(dataverseName, datasetName);
         if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
index a0d268a..1fe0c6f 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/PushFieldAccessRule.java
@@ -121,9 +121,9 @@ public class PushFieldAccessRule implements IAlgebraicRewriteRule {
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         AqlSourceId asid = ((IDataSource<AqlSourceId>) scan.getDataSource()).getId();
 
-        Dataset dataset = mp.findDataset(asid.getDataverseName(), asid.getDatasetName());
+        Dataset dataset = mp.findDataset(asid.getDataverseName(), asid.getDatasourceName());
         if (dataset == null) {
-            throw new AlgebricksException("Dataset " + asid.getDatasetName() + " not found.");
+            throw new AlgebricksException("Dataset " + asid.getDatasourceName() + " not found.");
         }
         if (dataset.getDatasetType() != DatasetType.INTERNAL) {
             return false;
@@ -306,9 +306,9 @@ public class PushFieldAccessRule implements IAlgebraicRewriteRule {
                             }
                             AqlSourceId asid = dataSource.getId();
                             AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
-                            Dataset dataset = mp.findDataset(asid.getDataverseName(), asid.getDatasetName());
+                            Dataset dataset = mp.findDataset(asid.getDataverseName(), asid.getDatasourceName());
                             if (dataset == null) {
-                                throw new AlgebricksException("Dataset " + asid.getDatasetName() + " not found.");
+                                throw new AlgebricksException("Dataset " + asid.getDatasourceName() + " not found.");
                             }
                             if (dataset.getDatasetType() != DatasetType.INTERNAL) {
                                 setAsFinal(access, context, finalAnnot);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index 1104c53..78a327f 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -20,7 +20,8 @@ import java.util.List;
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.common.feeds.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
 import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
@@ -134,27 +135,23 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
                 return true;
             }
 
-            if (fid.equals(AsterixBuiltinFunctions.FEED_INGEST)) {
+            if (fid.equals(AsterixBuiltinFunctions.FEED_COLLECT)) {
                 if (unnest.getPositionalVariable() != null) {
                     throw new AlgebricksException("No positional variables are allowed over datasets.");
                 }
 
-                String feedArg = getStringArgument(f, 0);
-                String outputType = getStringArgument(f, 1);
-                String targetDataset = getStringArgument(f, 2);
+                String dataverse = getStringArgument(f, 0);
+                String sourceFeedName = getStringArgument(f, 1);
+                String getTargetFeed = getStringArgument(f, 2);
+                String subscriptionLocation = getStringArgument(f, 3);
+                String targetDataset = getStringArgument(f, 4);
+                String outputType = getStringArgument(f, 5);
 
                 AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
-                Pair<String, String> feedReference = parseDatasetReference(metadataProvider, feedArg);
-                String dataverseName = feedReference.first;
-                String feedName = feedReference.second;
-                Feed feed = metadataProvider.findFeed(dataverseName, feedName);
-                if (feed == null) {
-                    throw new AlgebricksException("Could not find feed " + feedName);
-                }
 
-                AqlSourceId asid = new AqlSourceId(dataverseName, feedName);
-                String policyName = metadataProvider.getConfig().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-                FeedPolicy policy = metadataProvider.findFeedPolicy(dataverseName, policyName);
+                AqlSourceId asid = new AqlSourceId(dataverse, getTargetFeed);
+                String policyName = metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+                FeedPolicy policy = metadataProvider.findFeedPolicy(dataverse, policyName);
                 if (policy == null) {
                     policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
                     if (policy == null) {
@@ -165,9 +162,9 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
                 ArrayList<LogicalVariable> v = new ArrayList<LogicalVariable>();
                 v.add(unnest.getVariable());
 
-                DataSourceScanOperator scan = new DataSourceScanOperator(v, createFeedDataSource(asid,
-                        new FeedConnectionId(dataverseName, feedName, targetDataset), metadataProvider, policy,
-                        outputType));
+                String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+                DataSourceScanOperator scan = new DataSourceScanOperator(v, createFeedDataSource(asid, targetDataset,
+                        sourceFeedName, subscriptionLocation, metadataProvider, policy, outputType, csLocations));
 
                 List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
                 scanInpList.addAll(unnest.getInputs());
@@ -192,16 +189,20 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
         context.addPrimaryKey(pk);
     }
 
-    private AqlDataSource createFeedDataSource(AqlSourceId aqlId, FeedConnectionId feedId,
-            AqlMetadataProvider metadataProvider, FeedPolicy feedPolicy, String outputType) throws AlgebricksException {
+    private AqlDataSource createFeedDataSource(AqlSourceId aqlId, String targetDataset, String sourceFeedName,
+            String subscriptionLocation, AqlMetadataProvider metadataProvider, FeedPolicy feedPolicy,
+            String outputType, String locations) throws AlgebricksException {
         if (!aqlId.getDataverseName().equals(
                 metadataProvider.getDefaultDataverse() == null ? null : metadataProvider.getDefaultDataverse()
                         .getDataverseName())) {
             return null;
         }
-        IAType feedOutputType = metadataProvider.findType(feedId.getDataverse(), outputType);
-        FeedDataSource feedDataSource = new FeedDataSource(aqlId, feedId, feedOutputType,
-                AqlDataSource.AqlDataSourceType.FEED);
+        IAType feedOutputType = metadataProvider.findType(aqlId.getDataverseName(), outputType);
+        Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), sourceFeedName);
+
+        FeedDataSource feedDataSource = new FeedDataSource(aqlId, targetDataset, feedOutputType,
+                AqlDataSource.AqlDataSourceType.FEED, sourceFeed.getFeedId(), sourceFeed.getFeedType(),
+                ConnectionLocation.valueOf(subscriptionLocation), locations.split(","));
         feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
         return feedDataSource;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java
index f06784a..e00737e 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AbstractAqlTranslator.java
@@ -26,6 +26,7 @@ import edu.uci.ics.asterix.aql.expression.DeleteStatement;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.InsertStatement;
 import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
+import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
 import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints;
@@ -48,12 +49,12 @@ public abstract class AbstractAqlTranslator {
 
     public void validateOperation(Dataverse defaultDataverse, Statement stmt) throws AsterixException {
 
-        if (!(AsterixClusterProperties.INSTANCE.getState().equals(AsterixClusterProperties.State.ACTIVE) && AsterixClusterProperties.INSTANCE
+        if (!(AsterixClusterProperties.INSTANCE.getState().equals(ClusterState.ACTIVE) && AsterixClusterProperties.INSTANCE
                 .isGlobalRecoveryCompleted())) {
             int maxWaitCycles = AsterixAppContextInfo.getInstance().getExternalProperties().getMaxWaitClusterActive();
             int waitCycleCount = 0;
             try {
-                while (!AsterixClusterProperties.INSTANCE.getState().equals(AsterixClusterProperties.State.ACTIVE)
+                while (!AsterixClusterProperties.INSTANCE.getState().equals(ClusterState.ACTIVE)
                         && waitCycleCount < maxWaitCycles) {
                     Thread.sleep(1000);
                     waitCycleCount++;
@@ -61,21 +62,21 @@ public abstract class AbstractAqlTranslator {
             } catch (InterruptedException e) {
                 if (LOGGER.isLoggable(Level.WARNING)) {
                     LOGGER.warning("Thread interrupted while waiting for cluster to be "
-                            + AsterixClusterProperties.State.ACTIVE);
+                            + ClusterState.ACTIVE);
                 }
             }
-            if (!AsterixClusterProperties.INSTANCE.getState().equals(AsterixClusterProperties.State.ACTIVE)) {
-                throw new AsterixException(" Asterix Cluster is in " + AsterixClusterProperties.State.UNUSABLE
+            if (!AsterixClusterProperties.INSTANCE.getState().equals(ClusterState.ACTIVE)) {
+                throw new AsterixException(" Asterix Cluster is in " + ClusterState.UNUSABLE
                         + " state." + "\n One or more Node Controllers have left or haven't joined yet.\n");
             } else {
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Cluster is now " + AsterixClusterProperties.State.ACTIVE);
+                    LOGGER.info("Cluster is now " + ClusterState.ACTIVE);
                 }
             }
         }
 
-        if (AsterixClusterProperties.INSTANCE.getState().equals(AsterixClusterProperties.State.UNUSABLE)) {
-            throw new AsterixException(" Asterix Cluster is in " + AsterixClusterProperties.State.UNUSABLE + " state."
+        if (AsterixClusterProperties.INSTANCE.getState().equals(ClusterState.UNUSABLE)) {
+            throw new AsterixException(" Asterix Cluster is in " + ClusterState.UNUSABLE + " state."
                     + "\n One or more Node Controllers have left.\n");
         }
 
@@ -94,7 +95,7 @@ public abstract class AbstractAqlTranslator {
             }
             if (!AsterixClusterProperties.INSTANCE.isGlobalRecoveryCompleted()) {
                 throw new AsterixException(" Asterix Cluster Global recovery is not yet complete and The system is in "
-                        + AsterixClusterProperties.State.ACTIVE + " state");
+                        + ClusterState.ACTIVE + " state");
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
index ef9797d..0360ffb 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
@@ -31,9 +31,11 @@ import edu.uci.ics.asterix.aql.expression.CallExpr;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
@@ -43,6 +45,7 @@ import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -401,6 +404,14 @@ public class AqlExpressionToPlanTranslator extends AbstractAqlTranslator impleme
                     leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
                     break;
                 }
+                case SUBSCRIBE_FEED: { 
+                    ILogicalOperator insertOp = new InsertDeleteOperator(targetDatasource, varRef, varRefsForLoading,   
+                            InsertDeleteOperator.Kind.INSERT, false);   
+                    insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));  
+                    leafOperator = new SinkOperator();  
+                    leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));    
+                    break;  
+                }
             }
             topOp = leafOperator;
         }
@@ -1547,15 +1558,22 @@ public class AqlExpressionToPlanTranslator extends AbstractAqlTranslator impleme
         // TODO Auto-generated method stub
         return null;
     }
-
+    
     @Override
-    public Pair<ILogicalOperator, LogicalVariable> visitCreateFeedStatement(CreateFeedStatement del,
+    public Pair<ILogicalOperator, LogicalVariable> visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement del,
             Mutable<ILogicalOperator> arg) throws AsterixException {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
+    public Pair<ILogicalOperator, LogicalVariable> visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement del,
+            Mutable<ILogicalOperator> arg) throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+ 
+    @Override
     public Pair<ILogicalOperator, LogicalVariable> visitConnectFeedStatement(ConnectFeedStatement del,
             Mutable<ILogicalOperator> arg) throws AsterixException {
         // TODO Auto-generated method stub
@@ -1575,4 +1593,16 @@ public class AqlExpressionToPlanTranslator extends AbstractAqlTranslator impleme
         // TODO Auto-generated method stub
         return null;
     }
+    
+    @Override
+    public Pair<ILogicalOperator, LogicalVariable> visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps,
+            Mutable<ILogicalOperator> arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public Pair<ILogicalOperator, LogicalVariable> visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs,
+            Mutable<ILogicalOperator> arg) throws AsterixException {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java
index e21eb9b..aa071f6 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlPlusExpressionToPlanTranslator.java
@@ -30,9 +30,12 @@ import edu.uci.ics.asterix.aql.expression.CallExpr;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
@@ -42,6 +45,7 @@ import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -1444,28 +1448,49 @@ public class AqlPlusExpressionToPlanTranslator extends AbstractAqlTranslator imp
     }
 
     @Override
-    public Pair<ILogicalOperator, LogicalVariable> visitCreateFeedStatement(CreateFeedStatement del,
+    public Pair<ILogicalOperator, LogicalVariable> visitConnectFeedStatement(ConnectFeedStatement del,
             Mutable<ILogicalOperator> arg) throws AsterixException {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
-    public Pair<ILogicalOperator, LogicalVariable> visitConnectFeedStatement(ConnectFeedStatement del,
+    public Pair<ILogicalOperator, LogicalVariable> visitDropFeedStatement(FeedDropStatement del,
             Mutable<ILogicalOperator> arg) throws AsterixException {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
-    public Pair<ILogicalOperator, LogicalVariable> visitDropFeedStatement(FeedDropStatement del,
+    public Pair<ILogicalOperator, LogicalVariable> visitCompactStatement(CompactStatement del,
+            Mutable<ILogicalOperator> arg) throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    
+    @Override
+    public Pair<ILogicalOperator, LogicalVariable> visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement del,
             Mutable<ILogicalOperator> arg) throws AsterixException {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
-    public Pair<ILogicalOperator, LogicalVariable> visitCompactStatement(CompactStatement del,
+    public Pair<ILogicalOperator, LogicalVariable> visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement del,
+            Mutable<ILogicalOperator> arg) throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Pair<ILogicalOperator, LogicalVariable> visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps,
+            Mutable<ILogicalOperator> arg) throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Pair<ILogicalOperator, LogicalVariable> visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs,
             Mutable<ILogicalOperator> arg) throws AsterixException {
         // TODO Auto-generated method stub
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
index 6e9f197..c08f100 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
@@ -34,6 +34,7 @@ import edu.uci.ics.asterix.aql.expression.VariableExpr;
 import edu.uci.ics.asterix.aql.expression.WhereClause;
 import edu.uci.ics.asterix.aql.literal.StringLiteral;
 import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
 import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
@@ -385,6 +386,48 @@ public class CompiledStatements {
             return policyName;
         }
     }
+    
+    public static class CompiledSubscribeFeedStatement implements ICompiledDmlStatement {
+
+        private final FeedConnectionRequest request;
+        private Query query;
+        private final int varCounter;
+
+        public CompiledSubscribeFeedStatement(FeedConnectionRequest request, Query query, int varCounter) {
+            this.request = request;
+            this.query = query;
+            this.varCounter = varCounter;
+        }
+
+        @Override
+        public String getDataverseName() {
+            return request.getReceivingFeedId().getDataverse();
+        }
+
+        @Override
+        public String getDatasetName() {
+            return request.getTargetDataset();
+        }
+
+        public int getVarCounter() {
+            return varCounter;
+        }
+
+        public Query getQuery() {
+            return query;
+        }
+
+        public void setQuery(Query query) {
+            this.query = query;
+        }
+
+        @Override
+        public Kind getKind() {
+            return Kind.SUBSCRIBE_FEED;
+        }
+
+    }
+
 
     public static class CompiledDisconnectFeedStatement implements ICompiledDmlStatement {
         private String dataverseName;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/data/twitter/obamatweets_duplicate.adm
----------------------------------------------------------------------
diff --git a/asterix-app/data/twitter/obamatweets_duplicate.adm b/asterix-app/data/twitter/obamatweets_duplicate.adm
new file mode 100644
index 0000000..e8442e1
--- /dev/null
+++ b/asterix-app/data/twitter/obamatweets_duplicate.adm
@@ -0,0 +1,12 @@
+ { "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+ { "id": "nc1:100", "username": "KidrauhlProuds", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson  uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+ { "id": "nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012" }
+ { "id": "nc1:104", "username": "princeofsupras", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson e uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:15 PST 2012" }
+ { "id": "nc1:106", "username": "GulfDogs", "location": "", "text": "Obama Admin Knew Libyan Terrorists Had US-Provided Weaponsteaparty #tcot #ccot #NewGuards #BreitbartArmy #patriotwttp://t.co/vJxzrQUE", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+ { "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+ { "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012" }
+ { "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+ { "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+ { "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+ { "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+ { "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 6409a13..6d7f2a4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -22,6 +22,7 @@ import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
 import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
+import edu.uci.ics.asterix.common.config.AsterixFeedProperties;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -31,11 +32,11 @@ import edu.uci.ics.asterix.common.context.AsterixFileMapManager;
 import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.feeds.IFeedManager;
+import edu.uci.ics.asterix.common.feeds.api.IFeedManager;
 import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
+import edu.uci.ics.asterix.feeds.FeedManager;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
-import edu.uci.ics.asterix.metadata.feeds.FeedManager;
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
@@ -89,6 +90,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     private AsterixMetadataProperties metadataProperties;
     private AsterixStorageProperties storageProperties;
     private AsterixTransactionProperties txnProperties;
+    private AsterixFeedProperties feedProperties;
+
 
     private AsterixThreadExecutor threadExecutor;
     private DatasetLifecycleManager indexLifecycleManager;
@@ -111,6 +114,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         metadataProperties = new AsterixMetadataProperties(ASTERIX_PROPERTIES_ACCESSOR);
         storageProperties = new AsterixStorageProperties(ASTERIX_PROPERTIES_ACCESSOR);
         txnProperties = new AsterixTransactionProperties(ASTERIX_PROPERTIES_ACCESSOR);
+        feedProperties = new AsterixFeedProperties(ASTERIX_PROPERTIES_ACCESSOR);
     }
 
     public void initialize() throws IOException, ACIDException, AsterixException {
@@ -147,7 +151,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         
         isShuttingdown = false;
 
-        feedManager = new FeedManager(ncApplicationContext.getNodeId());
+        feedManager = new FeedManager(ncApplicationContext.getNodeId(), feedProperties,
+                compilerProperties.getFrameSize());
 
         // The order of registration is important. The buffer cache must registered before recovery and transaction managers.
         ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
@@ -234,6 +239,11 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     public AsterixExternalProperties getExternalProperties() {
         return externalProperties;
     }
+    
+    @Override
+    public AsterixFeedProperties getFeedProperties() {
+        return feedProperties;
+    }
 
     @Override
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/FeedWorkCollection.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/FeedWorkCollection.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/FeedWorkCollection.java
new file mode 100644
index 0000000..e206d27
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/FeedWorkCollection.java
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.common;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.api.common.SessionConfig.OutputFormat;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.expression.SubscribeFeedStatement;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest.ConnectionStatus;
+import edu.uci.ics.asterix.common.feeds.api.IFeedWork;
+import edu.uci.ics.asterix.common.feeds.api.IFeedWorkEventListener;
+import edu.uci.ics.asterix.feeds.FeedCollectInfo;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+/**
+ * A collection of feed management related task, each represented as an implementation of {@code IFeedWork}.
+ */
+public class FeedWorkCollection {
+
+    private static Logger LOGGER = Logger.getLogger(FeedWorkCollection.class.getName());
+
+    /**
+     * The task of subscribing to a feed to obtain data.
+     */
+    public static class SubscribeFeedWork implements IFeedWork {
+
+        private final Runnable runnable;
+
+        private final FeedConnectionRequest request;
+
+        @Override
+        public Runnable getRunnable() {
+            return runnable;
+        }
+
+        public SubscribeFeedWork(String[] locations, FeedConnectionRequest request) {
+            this.runnable = new SubscribeFeedWorkRunnable(locations, request);
+            this.request = request;
+        }
+
+        private static class SubscribeFeedWorkRunnable implements Runnable {
+
+            private final FeedConnectionRequest request;
+            private final String[] locations;
+
+            public SubscribeFeedWorkRunnable(String[] locations, FeedConnectionRequest request) {
+                this.request = request;
+                this.locations = locations;
+            }
+
+            @Override
+            public void run() {
+                try {
+                    PrintWriter writer = new PrintWriter(System.out, true);
+                    SessionConfig pc = new SessionConfig(writer, OutputFormat.ADM);
+                    DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(request.getReceivingFeedId()
+                            .getDataverse()));
+                    SubscribeFeedStatement subscribeStmt = new SubscribeFeedStatement(locations, request);
+                    List<Statement> statements = new ArrayList<Statement>();
+                    statements.add(dataverseDecl);
+                    statements.add(subscribeStmt);
+                    AqlTranslator translator = new AqlTranslator(statements, pc);
+                    translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, AqlTranslator.ResultDelivery.SYNC);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Submitted connection requests for execution: " + request);
+                    }
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Exception in executing " + request);
+                    }
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public static class FeedSubscribeWorkEventListener implements IFeedWorkEventListener {
+
+            @Override
+            public void workFailed(IFeedWork work, Exception e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request
+                            + " failed with exception " + e);
+                }
+            }
+
+            @Override
+            public void workCompleted(IFeedWork work) {
+                ((SubscribeFeedWork) work).request.setSubscriptionStatus(ConnectionStatus.ACTIVE);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.warning(" Feed subscription request " + ((SubscribeFeedWork) work).request + " completed ");
+                }
+            }
+
+        }
+
+        public FeedConnectionRequest getRequest() {
+            return request;
+        }
+
+        @Override
+        public String toString() {
+            return "SubscribeFeedWork for [" + request + "]";
+        }
+
+    }
+
+    /**
+     * The task of activating a set of feeds.
+     */
+    public static class ActivateFeedWork implements IFeedWork {
+
+        private final Runnable runnable;
+
+        @Override
+        public Runnable getRunnable() {
+            return runnable;
+        }
+
+        public ActivateFeedWork(List<FeedCollectInfo> feedsToRevive) {
+            this.runnable = new FeedsActivateRunnable(feedsToRevive);
+        }
+
+        public ActivateFeedWork() {
+            this.runnable = new FeedsActivateRunnable();
+        }
+
+        private static class FeedsActivateRunnable implements Runnable {
+
+            private List<FeedCollectInfo> feedsToRevive;
+            private Mode mode;
+
+            public enum Mode {
+                REVIVAL_POST_NODE_REJOIN
+            }
+
+            public FeedsActivateRunnable(List<FeedCollectInfo> feedsToRevive) {
+                this.feedsToRevive = feedsToRevive;
+            }
+
+            public FeedsActivateRunnable() {
+            }
+
+            @Override
+            public void run() {
+                switch (mode) {
+                    case REVIVAL_POST_NODE_REJOIN:
+                        try {
+                            Thread.sleep(10000);
+                        } catch (InterruptedException e1) {
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Attempt to resume feed interrupted");
+                            }
+                            throw new IllegalStateException(e1.getMessage());
+                        }
+                        for (FeedCollectInfo finfo : feedsToRevive) {
+                            try {
+                                JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+                                if (LOGGER.isLoggable(Level.INFO)) {
+                                    LOGGER.info("Resumed feed :" + finfo.feedConnectionId + " job id " + jobId);
+                                    LOGGER.info("Job:" + finfo.jobSpec);
+                                }
+                            } catch (Exception e) {
+                                if (LOGGER.isLoggable(Level.WARNING)) {
+                                    LOGGER.warning("Unable to resume feed " + finfo.feedConnectionId + " "
+                                            + e.getMessage());
+                                }
+                            }
+                        }
+                }
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
index c5fb76b..7559326 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -28,6 +28,7 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.asterix.feeds.CentralFeedManager;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
@@ -76,7 +77,7 @@ public class ConnectorAPIServlet extends HttpServlet {
             MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
 
             // Retrieves file splits of the dataset.
-            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null);
+            AqlMetadataProvider metadataProvider = new AqlMetadataProvider(null, CentralFeedManager.getInstance());
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
             if (dataset == null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae85a1dc/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/DDLAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/DDLAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/DDLAPIServlet.java
index 51af387..555ee67 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/DDLAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/DDLAPIServlet.java
@@ -33,7 +33,7 @@ public class DDLAPIServlet extends RESTAPIServlet {
         Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.DATAVERSE_DROP, Kind.DATASET_DECL, Kind.NODEGROUP_DECL,
                 Kind.NODEGROUP_DROP, Kind.TYPE_DECL, Kind.TYPE_DROP, Kind.CREATE_INDEX, Kind.INDEX_DECL,
                 Kind.CREATE_DATAVERSE, Kind.DATASET_DROP, Kind.INDEX_DROP, Kind.CREATE_FUNCTION, Kind.FUNCTION_DROP,
-                Kind.CREATE_FEED };
+                Kind.CREATE_PRIMARY_FEED, Kind.CREATE_SECONDARY_FEED, Kind.DROP_FEED, Kind.CREATE_FEED_POLICY, Kind.DROP_FEED_POLICY };
         return Arrays.asList(statementsArray);
     }