You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by xi...@apache.org on 2018/02/23 18:41:02 UTC

asterixdb git commit: [ASTERIXDB-2227][ING] Enabling filitering incoming data in feed

Repository: asterixdb
Updated Branches:
  refs/heads/master 172ce0b89 -> f674168df


[ASTERIXDB-2227][ING] Enabling filitering incoming data in feed

- user model changes: yes
  Add syntax support for specifying predicate in connect feed
- storage format changes: no
- interface changes: no

Details:
In some use cases, a user may want to filter the incombing data with
certain attributes. One example can be only store the incoming tweets
with geo locations. This patch enables the <WHERE> clause in connect
feed statement. User can subset the incoming data using following
syntax:

 connect feed feeds.TweetFeed to  dataset Tweets3 using policy `Basic`
 WHERE id NOT LIKE 'nc1:10%' OR username = 'BronsonMike';

Change-Id: I0b3cc6fe9d7fb5f5645dd9c759da448bfe1e88f1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2255
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f674168d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f674168d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f674168d

Branch: refs/heads/master
Commit: f674168dfde9c05700ea3b68e0b95594797d682b
Parents: 172ce0b
Author: Xikui Wang <xk...@gmail.com>
Authored: Thu Feb 22 19:17:17 2018 -0800
Committer: Xikui Wang <xk...@gmail.com>
Committed: Fri Feb 23 10:40:40 2018 -0800

----------------------------------------------------------------------
 .../asterix/app/translator/QueryTranslator.java |  3 +-
 .../apache/asterix/utils/FeedOperations.java    | 22 +++++++++-
 .../feeds/feeds_13/feeds_13.1.ddl.sqlpp         | 44 ++++++++++++++++++++
 .../feeds/feeds_13/feeds_13.2.update.sqlpp      | 27 ++++++++++++
 .../feeds/feeds_13/feeds_13.3.query.sqlpp       | 33 +++++++++++++++
 .../results/feeds/feeds_13/feeds_13.1.adm       | 15 +++++++
 .../resources/runtimets/testsuite_sqlpp.xml     |  5 +++
 .../asterix-lang-aql/src/main/javacc/AQL.jj     |  2 +-
 .../common/statement/ConnectFeedStatement.java  |  8 +++-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj | 28 ++++++++++++-
 .../metadata/MetadataTransactionContext.java    |  3 +-
 .../metadata/bootstrap/MetadataRecordTypes.java |  1 +
 .../metadata/entities/FeedConnection.java       |  8 +++-
 .../FeedConnectionTupleTranslator.java          | 26 +++++++++++-
 14 files changed, 215 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b64f828..aabb7c2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2181,6 +2181,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         String feedName = cfs.getFeedName();
         String datasetName = cfs.getDatasetName().getValue();
         String policyName = cfs.getPolicy();
+        String whereClauseBody = cfs.getWhereClauseBody();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         // TODO: Check whether we are connecting a change feed to a non-meta dataset
@@ -2213,7 +2214,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             if (fc != null) {
                 throw new AlgebricksException("Feed" + feedName + " is already connected dataset " + datasetName);
             }
-            fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName,
+            fc = new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, whereClauseBody,
                     outputType.getTypeName());
             MetadataManager.INSTANCE.addFeedConnection(metadataProvider.getMetadataTxnContext(), fc);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index b6371dc..424444a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -40,6 +40,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.TxnId;
@@ -58,8 +59,11 @@ import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.clause.LetClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -78,6 +82,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectClause;
 import org.apache.asterix.lang.sqlpp.clause.SelectElement;
 import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
+import org.apache.asterix.lang.sqlpp.parser.SqlppParserFactory;
 import org.apache.asterix.lang.sqlpp.struct.SetOperationInput;
 import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -192,7 +197,7 @@ public class FeedOperations {
         return argExprs;
     }
 
-    private static Query makeConnectionQuery(FeedConnection feedConnection) {
+    private static Query makeConnectionQuery(FeedConnection feedConnection) throws AlgebricksException {
         // Construct from clause
         VarIdentifier fromVarId = SqlppVariableUtil.toInternalVariableIdentifier(feedConnection.getFeedName());
         VariableExpr fromTermLeftExpr = new VariableExpr(fromVarId);
@@ -204,6 +209,19 @@ public class FeedOperations {
         CallExpr datasrouceCallFunction = new CallExpr(new FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList);
         FromTerm fromterm = new FromTerm(datasrouceCallFunction, fromTermLeftExpr, null, null);
         FromClause fromClause = new FromClause(Arrays.asList(fromterm));
+        WhereClause whereClause = null;
+        if (feedConnection.getWhereClauseBody().length() != 0) {
+            String whereClauseExpr = feedConnection.getWhereClauseBody() + ";";
+            IParserFactory sqlppParserFactory = new SqlppParserFactory();
+            IParser sqlppParser = sqlppParserFactory.createParser(whereClauseExpr);
+            List<Statement> stmts = sqlppParser.parse();
+            if (stmts.size() != 1) {
+                throw new CompilationException("Exceptions happened in processing where clause.");
+            }
+            Query whereClauseQuery = (Query) stmts.get(0);
+            whereClause = new WhereClause(whereClauseQuery.getBody());
+        }
+
         // TODO: This can be the place to add select predicate for ingestion
         // Attaching functions
         int varIdx = 1;
@@ -222,7 +240,7 @@ public class FeedOperations {
         // Constructing select clause
         SelectElement selectElement = new SelectElement(previousVarExpr);
         SelectClause selectClause = new SelectClause(selectElement, null, false);
-        SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, letClauses, null, null, null, null);
+        SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, letClauses, whereClause, null, null, null);
         SelectSetOperation selectSetOperation = new SelectSetOperation(new SetOperationInput(selectBlock, null), null);
         SelectExpression body = new SelectExpression(null, selectSetOperation, null, null, true);
         Query query = new Query(false, true, body, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
new file mode 100644
index 0000000..412ce03
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.1.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse feeds if exists;
+create  dataverse feeds;
+use feeds;
+
+create type feeds.TweetType as
+ closed {
+  id : string,
+  username : string,
+  location : string,
+  text : string,
+  timestamp : string
+};
+
+create dataset Tweets1(TweetType) primary key id;
+create dataset Tweets2(TweetType) primary key id;
+create dataset Tweets3(TweetType) primary key id;
+create dataset Tweets4(TweetType) primary key id;
+
+create feed TweetFeed with {
+  "adapter-name" : "localfs",
+  "path":"asterix_nc1://data/twitter/obamatweets.adm",
+  "format":"adm",
+  "type-name":"TweetType",
+  "tuple-interval":"10"
+};

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
new file mode 100644
index 0000000..dd83b35
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+set `wait-for-completion-feed` `true`;
+connect feed feeds.TweetFeed to  dataset Tweets1 using policy `Basic` WHERE id = 'nc1:115';
+connect feed feeds.TweetFeed to  dataset Tweets2 using policy `Basic` WHERE id LIKE 'nc1:11%';
+connect feed feeds.TweetFeed to  dataset Tweets3 using policy `Basic` WHERE id NOT LIKE 'nc1:10%' OR username = 'BronsonMike';
+connect feed feeds.TweetFeed to  dataset Tweets4 using policy `Basic` WHERE id LIKE 'nc1:11%' AND username = 'thewildpitch';
+
+start feed feeds.TweetFeed;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
new file mode 100644
index 0000000..20c6320
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_13/feeds_13.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use feeds;
+
+select value x
+from  Tweets1 as x
+union all
+select value x2
+from  Tweets2 as x2
+union all
+select value x3
+from  Tweets3 as x3
+union all
+select value x4
+from  Tweets4 as x4
+order by id;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
new file mode 100644
index 0000000..2c37364
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_13/feeds_13.1.adm
@@ -0,0 +1,15 @@
+{ "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ...  #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+{ "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }
+{ "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012" }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index bb0d473..96dbf01 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8483,6 +8483,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
+      <compilation-unit name="feeds_13">
+        <output-dir compare="Text">feeds_13</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
       <compilation-unit name="issue_230_feeds">
         <output-dir compare="Text">issue_230_feeds</output-dir>
       </compilation-unit>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 02d2220..74fe907 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -1166,7 +1166,7 @@ Statement FeedStatement() throws ParseException:
     <CONNECT> <FEED> feedNameComponents = QualifiedName() <TO> <DATASET> datasetNameComponents = QualifiedName()
     (ApplyFunction(appliedFunctions))? (policy = GetPolicy())?
       {
-        stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions, policy, getVarCounter());
+        stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions, policy, null, getVarCounter());
       }
     | <DISCONNECT> <FEED> feedNameComponents = QualifiedName() <FROM> <DATASET> datasetNameComponents = QualifiedName()
       {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index 3b6a1c3..b0a3f6e 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -34,11 +34,12 @@ public class ConnectFeedStatement implements Statement {
     private final Identifier datasetName;
     private final String feedName;
     private final String policy;
+    private final String whereClauseBody;
     private int varCounter;
     private final List<FunctionSignature> appliedFunctions;
 
     public ConnectFeedStatement(Pair<Identifier, Identifier> feedNameCmp, Pair<Identifier, Identifier> datasetNameCmp,
-            List<FunctionSignature> appliedFunctions, String policy, int varCounter) {
+            List<FunctionSignature> appliedFunctions, String policy, String whereClauseBody, int varCounter) {
         if (feedNameCmp.first != null && datasetNameCmp.first != null
                 && !feedNameCmp.first.getValue().equals(datasetNameCmp.first.getValue())) {
             throw new IllegalArgumentException("Dataverse for source feed and target dataset do not match");
@@ -48,6 +49,7 @@ public class ConnectFeedStatement implements Statement {
         this.datasetName = datasetNameCmp.second;
         this.feedName = feedNameCmp.second.getValue();
         this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
+        this.whereClauseBody = whereClauseBody;
         this.varCounter = varCounter;
         this.appliedFunctions = appliedFunctions;
     }
@@ -64,6 +66,10 @@ public class ConnectFeedStatement implements Statement {
         return varCounter;
     }
 
+    public String getWhereClauseBody() {
+        return whereClauseBody;
+    }
+
     @Override
     public Kind getKind() {
         return Statement.Kind.CONNECT_FEED;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 7a99814..42b8d15 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -157,6 +157,7 @@ import org.apache.asterix.lang.sqlpp.clause.SelectElement;
 import org.apache.asterix.lang.sqlpp.clause.SelectRegular;
 import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
 import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
+import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.sqlpp.expression.CaseExpression;
 import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.optype.JoinType;
@@ -1264,14 +1265,37 @@ Statement ConnectStatement() throws ParseException:
   List<FunctionSignature> appliedFunctions = new ArrayList<FunctionSignature>();
   Statement stmt = null;
   String policy = null;
+  String whereClauseBody = null;
+  WhereClause whereClause = null;
+  Token beginPos = null;
+  Token endPos = null;
 }
 {
   (
     <FEED> feedNameComponents = QualifiedName() <TO> Dataset() datasetNameComponents = QualifiedName()
-    (ApplyFunction(appliedFunctions))?  (policy = GetPolicy())?
+    (ApplyFunction(appliedFunctions))?
+    (policy = GetPolicy())?
+    (
+      <WHERE>
+      {
+        beginPos = token;
+        whereClause = new WhereClause();
+        Expression whereExpr;
+      }
+      whereExpr = Expression()
+      {
+        whereClause.setWhereExpr(whereExpr);
+      }
+    )?
+    {
+      if (whereClause != null) {
+        endPos = token;
+        whereClauseBody = extractFragment(beginPos.endLine, beginPos.endColumn, endPos.endLine, endPos.endColumn + 1);
+      }
+    }
       {
         stmt = new ConnectFeedStatement(feedNameComponents, datasetNameComponents, appliedFunctions,
-         policy, getVarCounter());
+         policy, whereClauseBody, getVarCounter());
       }
   )
   {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index a1eb425..367f568 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -242,7 +242,8 @@ public class MetadataTransactionContext extends MetadataCache {
     }
 
     public void dropFeedConnection(String dataverseName, String feedName, String datasetName) {
-        FeedConnection feedConnection = new FeedConnection(dataverseName, feedName, datasetName, null, null, null);
+        FeedConnection feedConnection =
+                new FeedConnection(dataverseName, feedName, datasetName, null, null, null, null);
         droppedCache.addFeedConnectionIfNotExists(feedConnection);
         logAndApply(new MetadataLogicalOperation(feedConnection, false));
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 54a69eb..ba1ea03 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -100,6 +100,7 @@ public final class MetadataRecordTypes {
     public static final String FIELD_NAME_VALUE = "Value";
     public static final String FIELD_NAME_WORKING_MEMORY_SIZE = "WorkingMemorySize";
     public static final String FIELD_NAME_APPLIED_FUNCTIONS = "AppliedFunctions";
+    public static final String FIELD_NAME_WHERE_CLAUSE = "WhereClause";
 
     //---------------------------------- Record Types Creation ----------------------------------//
     //--------------------------------------- Properties ----------------------------------------//

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
index 7572a9a..78d6e4e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -40,17 +40,19 @@ public class FeedConnection implements IMetadataEntity<FeedConnection> {
     private String feedName;
     private String datasetName;
     private String policyName;
+    private String whereClauseBody;
     private String outputType;
     private List<FunctionSignature> appliedFunctions;
 
     public FeedConnection(String dataverseName, String feedName, String datasetName,
-            List<FunctionSignature> appliedFunctions, String policyName, String outputType) {
+            List<FunctionSignature> appliedFunctions, String policyName, String whereClauseBody, String outputType) {
         this.dataverseName = dataverseName;
         this.feedName = feedName;
         this.datasetName = datasetName;
         this.appliedFunctions = appliedFunctions;
         this.connectionId = feedName + ":" + datasetName;
         this.policyName = policyName;
+        this.whereClauseBody = whereClauseBody == null ? "" : whereClauseBody;
         this.outputType = outputType;
         this.feedId = new EntityId(FeedUtils.FEED_EXTENSION_NAME, dataverseName, feedName);
     }
@@ -105,6 +107,10 @@ public class FeedConnection implements IMetadataEntity<FeedConnection> {
         return policyName;
     }
 
+    public String getWhereClauseBody() {
+        return whereClauseBody;
+    }
+
     public String getOutputType() {
         return outputType;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f674168d/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
index 269497b..61a8ab2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FeedConnectionTupleTranslator.java
@@ -22,6 +22,7 @@ package org.apache.asterix.metadata.entitytupletranslators;
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -32,6 +33,7 @@ import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
 import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.om.base.AInt64;
 import org.apache.asterix.om.base.AMissing;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.base.ARecord;
@@ -53,6 +55,8 @@ public class FeedConnectionTupleTranslator extends AbstractTupleTranslator<FeedC
 
     public static final int FEED_CONN_PAYLOAD_TUPLE_FIELD_INDEX = 3;
 
+    protected final transient ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage();
+
     private ISerializerDeserializer<ARecord> recordSerDes = SerializerDeserializerProvider.INSTANCE
             .getSerializerDeserializer(MetadataRecordTypes.FEED_CONNECTION_RECORDTYPE);
 
@@ -101,7 +105,12 @@ public class FeedConnectionTupleTranslator extends AbstractTupleTranslator<FeedC
             }
         }
 
-        return new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, outputType);
+        int whereClauseIdx = feedConnRecord.getType().getFieldIndex(MetadataRecordTypes.FIELD_NAME_WHERE_CLAUSE);
+        String whereClauseBody =
+                whereClauseIdx >= 0 ? ((AString) feedConnRecord.getValueByPos(whereClauseIdx)).getStringValue() : "";
+
+        return new FeedConnection(dataverseName, feedName, datasetName, appliedFunctions, policyName, whereClauseBody,
+                outputType);
     }
 
     @Override
@@ -159,6 +168,9 @@ public class FeedConnectionTupleTranslator extends AbstractTupleTranslator<FeedC
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.FEED_CONN_POLICY_FIELD_INDEX, fieldValue);
 
+        // field: whereClauseBody
+        writeOpenPart(me);
+
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
         tupleBuilder.addFieldEndOffset();
 
@@ -166,6 +178,18 @@ public class FeedConnectionTupleTranslator extends AbstractTupleTranslator<FeedC
         return tuple;
     }
 
+    protected void writeOpenPart(FeedConnection fc) throws HyracksDataException {
+        if (fc.getWhereClauseBody() != null && fc.getWhereClauseBody().length() > 0) {
+            fieldName.reset();
+            aString.setValue(MetadataRecordTypes.FIELD_NAME_WHERE_CLAUSE);
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            aString.setValue(fc.getWhereClauseBody());
+            stringSerde.serialize(aString, fieldValue.getDataOutput());
+            recordBuilder.addField(fieldName, fieldValue);
+        }
+    }
+
     private void writeAppliedFunctionsField(IARecordBuilder rb, FeedConnection fc, ArrayBackedValueStorage buffer)
             throws HyracksDataException {
         UnorderedListBuilder listBuilder = new UnorderedListBuilder();