You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2020/11/16 21:14:37 UTC

[asterixdb] branch master updated: [NO ISSUE][COMP] LIMIT pushdown for external scan

This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f081bfa  [NO ISSUE][COMP] LIMIT pushdown for external scan
f081bfa is described below

commit f081bfa91ea3e90c774b91934a23ba1d240ed8e0
Author: Dmitry Lychagin <dm...@couchbase.com>
AuthorDate: Fri Nov 13 11:52:02 2020 -0800

    [NO ISSUE][COMP] LIMIT pushdown for external scan
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Push LIMIT into external scan operator
    
    Change-Id: I51d079f3fc190286a1c108be6a19ce7cffb7f8a1
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8884
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
---
 .../rules/PushLimitIntoPrimarySearchRule.java      |  3 +-
 ...push-limit-to-external-scan-select.1.ddl.sqlpp} | 23 ++------
 ...sh-limit-to-external-scan-select.2.query.sqlpp} | 25 ++------
 ...sh-limit-to-external-scan-select.3.query.sqlpp} | 24 ++------
 .../push-limit-to-external-scan.1.ddl.sqlpp}       | 23 ++------
 .../push-limit-to-external-scan.2.query.sqlpp}     | 24 ++------
 .../push-limit-to-external-scan.3.query.sqlpp}     | 23 ++------
 .../push-limit-to-external-scan-select.2.adm       | 22 +++++++
 .../push-limit-to-external-scan-select.3.adm       |  5 ++
 .../push-limit-to-external-scan.2.adm              | 22 +++++++
 .../push-limit-to-external-scan.3.adm              |  5 ++
 .../test/resources/runtimets/testsuite_sqlpp.xml   | 10 ++++
 .../common/external/IDataSourceAdapter.java        | 27 ++++++++-
 .../asterix/external/api/IDataFlowController.java  |  4 +-
 .../dataflow/FeedRecordDataFlowController.java     |  7 ++-
 .../dataflow/FeedStreamDataFlowController.java     |  8 ++-
 .../dataflow/RecordDataFlowController.java         | 19 +++++-
 .../dataflow/StreamDataFlowController.java         | 19 +++++-
 .../external/dataset/adapter/FeedAdapter.java      |  6 +-
 .../external/dataset/adapter/GenericAdapter.java   |  6 +-
 .../operators/ExternalScanOperatorDescriptor.java  | 26 +++++++--
 .../external/library/adapter/TestTypedAdapter.java | 10 +++-
 .../adapter/TestTypedFeedDataFlowController.java   |  3 +-
 .../metadata/declared/DatasetDataSource.java       |  9 +--
 .../metadata/declared/FunctionDataSource.java      |  9 +--
 .../metadata/declared/MetadataProvider.java        |  7 ++-
 .../dataflow/IndexSearchOperatorNodePushable.java  | 56 ++----------------
 .../tuples/ReferenceFrameTupleReference.java       | 68 ++++++++++++++++++++++
 ...MBTreeBatchPointSearchOperatorNodePushable.java |  7 ++-
 29 files changed, 298 insertions(+), 202 deletions(-)

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
index b7eb8b3..22dad3a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoPrimarySearchRule.java
@@ -205,7 +205,8 @@ public class PushLimitIntoPrimarySearchRule implements IAlgebraicRewriteRule {
                 && op.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
             return false;
         }
-        if (((DataSource) op.getDataSource()).getDatasourceType() != DataSource.Type.INTERNAL_DATASET) {
+        byte datasourceType = ((DataSource) op.getDataSource()).getDatasourceType();
+        if (datasourceType != DataSource.Type.INTERNAL_DATASET && datasourceType != DataSource.Type.EXTERNAL_DATASET) {
             return false;
         }
         if (!op.getScanVariables().containsAll(selectedVariables)) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp
similarity index 56%
copy from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp
index 708cdd8..5dcc87b 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.1.ddl.sqlpp
@@ -16,24 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.library.adapter;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
 
-class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
-    TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
-        super(ctx, null, 0);
-    }
+USE test;
 
-    @Override
-    public String getStats() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void start(IFrameWriter writer) {
-        throw new UnsupportedOperationException();
-    }
-}
+CREATE EXTERNAL DATASET ds1(f1 INTEGER, f2 BOOLEAN, f3 STRING)
+  USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"), ("null"=""));
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp
similarity index 56%
copy from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp
index 708cdd8..8a71219 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.query.sqlpp
@@ -16,24 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.library.adapter;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+USE test;
 
-class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
-    TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
-        super(ctx, null, 0);
-    }
-
-    @Override
-    public String getStats() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void start(IFrameWriter writer) {
-        throw new UnsupportedOperationException();
-    }
-}
+EXPLAIN
+SELECT VALUE f1
+FROM ds1 t
+WHERE f1 > 2
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp
similarity index 56%
copy from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp
index 708cdd8..fe7ffbf 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.query.sqlpp
@@ -16,24 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.library.adapter;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+USE test;
 
-class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
-    TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
-        super(ctx, null, 0);
-    }
-
-    @Override
-    public String getStats() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void start(IFrameWriter writer) {
-        throw new UnsupportedOperationException();
-    }
-}
+SELECT VALUE f1
+FROM ds1 t
+WHERE f1 > 2
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp
similarity index 56%
copy from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp
index 708cdd8..5dcc87b 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.1.ddl.sqlpp
@@ -16,24 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.library.adapter;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
 
-class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
-    TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
-        super(ctx, null, 0);
-    }
+USE test;
 
-    @Override
-    public String getStats() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void start(IFrameWriter writer) {
-        throw new UnsupportedOperationException();
-    }
-}
+CREATE EXTERNAL DATASET ds1(f1 INTEGER, f2 BOOLEAN, f3 STRING)
+  USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"), ("null"=""));
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp
similarity index 56%
copy from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp
index 708cdd8..cf98178 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.query.sqlpp
@@ -16,24 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.library.adapter;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+USE test;
 
-class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
-    TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
-        super(ctx, null, 0);
-    }
-
-    @Override
-    public String getStats() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void start(IFrameWriter writer) {
-        throw new UnsupportedOperationException();
-    }
-}
+EXPLAIN
+SELECT VALUE f1
+FROM ds1 t
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp
similarity index 56%
copy from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp
index 708cdd8..0c52414 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.query.sqlpp
@@ -16,24 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.library.adapter;
 
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
+USE test;
 
-class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
-    TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
-        super(ctx, null, 0);
-    }
-
-    @Override
-    public String getStats() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void start(IFrameWriter writer) {
-        throw new UnsupportedOperationException();
-    }
-}
+SELECT VALUE f1
+FROM ds1 t
+LIMIT 5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm
new file mode 100644
index 0000000..b38ed8b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.2.adm
@@ -0,0 +1,22 @@
+distribute result [$$16]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 5
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        limit 5
+        -- STREAM_LIMIT  |PARTITIONED|
+          project ([$$16])
+          -- STREAM_PROJECT  |PARTITIONED|
+            assign [$$16] <- [$$t.getField(0)]
+            -- ASSIGN  |PARTITIONED|
+              exchange
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$t] <- test.ds1 condition (gt($$t.getField(0), 2)) limit 5
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm
new file mode 100644
index 0000000..2288987
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan-select/push-limit-to-external-scan-select.3.adm
@@ -0,0 +1,5 @@
+3
+4
+5
+6
+7
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm
new file mode 100644
index 0000000..2c98237
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.2.adm
@@ -0,0 +1,22 @@
+distribute result [$$13]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 5
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        project ([$$13])
+        -- STREAM_PROJECT  |PARTITIONED|
+          assign [$$13] <- [$$t.getField(0)]
+          -- ASSIGN  |PARTITIONED|
+            limit 5
+            -- STREAM_LIMIT  |PARTITIONED|
+              exchange
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$t] <- test.ds1 limit 5
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm
new file mode 100644
index 0000000..85954ea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-external-scan/push-limit-to-external-scan.3.adm
@@ -0,0 +1,5 @@
+1
+2
+3
+4
+5
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 63912d6..a99b121 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -13944,6 +13944,16 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="limit">
+      <compilation-unit name="push-limit-to-external-scan">
+        <output-dir compare="Text">push-limit-to-external-scan</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="limit">
+      <compilation-unit name="push-limit-to-external-scan-select">
+        <output-dir compare="Text">push-limit-to-external-scan-select</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="limit">
       <compilation-unit name="push-limit-to-primary-scan">
         <output-dir compare="Text">push-limit-to-primary-scan</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
index 8fc70b8..143b805 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
@@ -20,6 +20,7 @@ package org.apache.asterix.common.external;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 /**
  * A super interface implemented by a data source adapter. An adapter can be a
@@ -46,9 +47,33 @@ public interface IDataSourceAdapter {
      *            write frame to. Adapter packs the fetched bytes (from external source),
      *            packs them into frames and forwards the frames to an upstream receiving
      *            operator using the instance of IFrameWriter.
+     * @param tupleFilter
+     *            If not {@code null} then only tuple matching this filter should be emitted
+     * @param outputLimit
+     *            Limits the number of tuples that should be emitted
+     *            (after applying the filter if it's specified).
+     *            {@code -1} if there is not limit.
      * @throws Exception
      */
-    public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException;
+    void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException;
+
+    /**
+     * Triggers the adapter to begin ingesting data from the external source.
+     *
+     * @param partition
+     *            The adapter could be running with a degree of parallelism.
+     *            partition corresponds to the i'th parallel instance.
+     * @param writer
+     *            The instance of frame writer that is used by the adapter to
+     *            write frame to. Adapter packs the fetched bytes (from external source),
+     *            packs them into frames and forwards the frames to an upstream receiving
+     *            operator using the instance of IFrameWriter.
+     * @throws Exception
+     */
+    default void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
+        start(partition, writer, null, -1);
+    }
 
     /**
      * @return The number of processed tuples by this adapter
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index ccc420b..527e342 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -22,11 +22,13 @@ import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 @FunctionalInterface
 public interface IDataFlowController {
 
-    public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException;
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException;
 
     public default boolean pause() throws HyracksDataException {
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 7a089b8..efb0f13 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -33,6 +33,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -67,7 +68,11 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException {
+        if (tupleFilter != null || outputLimit >= 0) {
+            throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+        }
         synchronized (this) {
             if (state == State.STOPPED) {
                 return;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 4deb422..b42e6de 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -18,12 +18,15 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 public class FeedStreamDataFlowController extends AbstractFeedDataFlowController {
 
@@ -39,7 +42,10 @@ public class FeedStreamDataFlowController extends AbstractFeedDataFlowController
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
+        if (tupleFilter != null || outputLimit >= 0) {
+            throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+        }
         try {
             tupleForwarder = new TupleForwarder(ctx, writer);
             while (true) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 2c19f9d..ce49ccf8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -26,6 +26,9 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
 
 public class RecordDataFlowController<T> extends AbstractDataFlowController {
 
@@ -42,17 +45,29 @@ public class RecordDataFlowController<T> extends AbstractDataFlowController {
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
         try {
             processedTuples = 0;
             ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
+            boolean tupleFilterExists = tupleFilter != null;
+            ArrayTupleReference tupleRef = tupleFilterExists ? new ArrayTupleReference() : null;
+            ReferenceFrameTupleReference frameTupleRef = tupleFilterExists ? new ReferenceFrameTupleReference() : null;
             TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
-            while (recordReader.hasNext()) {
+            while ((outputLimit < 0 || processedTuples < outputLimit) && recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 tb.reset();
                 if (dataParser.parse(record, tb.getDataOutput())) {
                     tb.addFieldEndOffset();
                     appendOtherTupleFields(tb);
+
+                    if (tupleFilterExists) {
+                        tupleRef.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+                        frameTupleRef.reset(tupleRef);
+                        if (!tupleFilter.accept(frameTupleRef)) {
+                            continue;
+                        }
+                    }
+
                     tupleForwarder.addTuple(tb);
                     processedTuples++;
                 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index 9c11c97..85320e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -23,6 +23,9 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
 
 public class StreamDataFlowController extends AbstractDataFlowController {
     private final IStreamDataParser dataParser;
@@ -33,17 +36,29 @@ public class StreamDataFlowController extends AbstractDataFlowController {
     }
 
     @Override
-    public void start(IFrameWriter writer) throws HyracksDataException {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) throws HyracksDataException {
         try {
             processedTuples = 0;
             ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+            boolean tupleFilterExists = tupleFilter != null;
+            ArrayTupleReference tupleRef = tupleFilterExists ? new ArrayTupleReference() : null;
+            ReferenceFrameTupleReference frameTupleRef = tupleFilterExists ? new ReferenceFrameTupleReference() : null;
             TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
-            while (true) {
+            while (outputLimit < 0 || processedTuples < outputLimit) {
                 tb.reset();
                 if (!dataParser.parse(tb.getDataOutput())) {
                     break;
                 }
                 tb.addFieldEndOffset();
+
+                if (tupleFilterExists) {
+                    tupleRef.reset(tb.getFieldEndOffsets(), tb.getByteArray());
+                    frameTupleRef.reset(tupleRef);
+                    if (!tupleFilter.accept(frameTupleRef)) {
+                        continue;
+                    }
+                }
+
                 tupleForwarder.addTuple(tb);
                 processedTuples++;
             }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 123a552..54e633a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -25,6 +25,7 @@ import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 public class FeedAdapter implements IDataSourceAdapter, Closeable {
     private final AbstractFeedDataFlowController controller;
@@ -34,8 +35,9 @@ public class FeedAdapter implements IDataSourceAdapter, Closeable {
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
-        controller.start(writer);
+    public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException {
+        controller.start(writer, tupleFilter, outputLimit);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 17a134b..9e75d69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -22,6 +22,7 @@ import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 public class GenericAdapter implements IDataSourceAdapter {
 
@@ -32,8 +33,9 @@ public class GenericAdapter implements IDataSourceAdapter {
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
-        controller.start(writer);
+    public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException, InterruptedException {
+        controller.start(writer, tupleFilter, outputLimit);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 1d7623d..82b8113 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -29,6 +29,8 @@ import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 
 /*
  * A single activity operator that provides the functionality of scanning data using an
@@ -36,21 +38,31 @@ import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNod
  */
 public class ExternalScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
-    private ITypedAdapterFactory adapterFactory;
+    private final ITypedAdapterFactory adapterFactory;
+
+    private final ITupleFilterFactory tupleFilterFactory;
+
+    private final long outputLimit;
 
     public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
-            ITypedAdapterFactory dataSourceAdapterFactory) {
+            ITypedAdapterFactory dataSourceAdapterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit) {
         super(spec, 0, 1);
         outRecDescs[0] = rDesc;
         this.adapterFactory = dataSourceAdapterFactory;
+        this.tupleFilterFactory = tupleFilterFactory;
+        this.outputLimit = outputLimit;
+    }
+
+    public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
+            ITypedAdapterFactory dataSourceAdapterFactory) {
+        this(spec, rDesc, dataSourceAdapterFactory, null, -1);
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+            IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
@@ -64,8 +76,10 @@ public class ExternalScanOperatorDescriptor extends AbstractSingleActivityOperat
                 }
                 try {
                     writer.open();
+                    ITupleFilter tupleFilter =
+                            tupleFilterFactory != null ? tupleFilterFactory.createTupleFilter(ctx) : null;
                     adapter = adapterFactory.createAdapter(ctx, partition);
-                    adapter.start(partition, writer);
+                    adapter.start(partition, writer, tupleFilter, outputLimit);
                     if (stats != null) {
                         stats.getTupleCounter().update(adapter.getProcessedTuples());
                     }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index effd59f..314cd20 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
@@ -34,6 +36,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.file.ITupleParser;
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -64,7 +67,12 @@ public class TestTypedAdapter extends FeedAdapter {
     }
 
     @Override
-    public void start(int partition, IFrameWriter writer) throws HyracksDataException {
+    public void start(int partition, IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit)
+            throws HyracksDataException {
+        if (tupleFilter != null || outputLimit >= 0) {
+            throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
+        }
+
         generator = new DummyGenerator(configuration, pos);
         ExecutorService executor = Executors.newSingleThreadExecutor();
         executor.execute(generator);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
index 708cdd8..14e70a9 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.library.adapter;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 
 class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
     TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
@@ -33,7 +34,7 @@ class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
     }
 
     @Override
-    public void start(IFrameWriter writer) {
+    public void start(IFrameWriter writer, ITupleFilter tupleFilter, long outputLimit) {
         throw new UnsupportedOperationException();
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 2ac8bf9..29a66b9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
@@ -105,10 +103,6 @@ public class DatasetDataSource extends DataSource {
             IProjectionInfo<?> projectionInfo) throws AlgebricksException {
         switch (dataset.getDatasetType()) {
             case EXTERNAL:
-                if (tupleFilterFactory != null || outputLimit >= 0) {
-                    throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
-                            "Tuple filter and limit are not supported by ExternalDataSource");
-                }
                 Dataset externalDataset = ((DatasetDataSource) dataSource).getDataset();
                 String itemTypeName = externalDataset.getItemTypeName();
                 IAType itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
@@ -118,7 +112,8 @@ public class DatasetDataSource extends DataSource {
                 Map<String, String> properties = addProjectionInfo(projectionInfo, edd.getProperties());
                 ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
                         edd.getAdapter(), properties, (ARecordType) itemType, null, context.getWarningCollector());
-                return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
+                return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+                        tupleFilterFactory, outputLimit);
             case INTERNAL:
                 DataSourceId id = getId();
                 DataverseName dataverseName = id.getDataverseName();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index d415f73..f5fd7dd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
 import org.apache.asterix.metadata.api.IDatasourceFunction;
@@ -86,17 +84,14 @@ public abstract class FunctionDataSource extends DataSource {
             ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
             IProjectionInfo<?> projectionInfo) throws AlgebricksException {
-        if (tupleFilterFactory != null || outputLimit >= 0) {
-            throw CompilationException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
-                    "tuple filter and limit are not supported by FunctionDataSource");
-        }
         GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
         adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
         IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager();
         FunctionDataSourceFactory factory =
                 new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm)));
         adapterFactory.configure(factory);
-        return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
+        return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory,
+                tupleFilterFactory, outputLimit);
     }
 
     protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index dd8b3b4..e5fd6f7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -995,7 +995,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory) throws AlgebricksException {
+            JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory,
+            ITupleFilterFactory tupleFilterFactory, long outputLimit) throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
             throw new AlgebricksException("Can only scan datasets of records.");
         }
@@ -1004,8 +1005,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 getDataFormat().getSerdeProvider().getSerializerDeserializer(itemType);
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
-        ExternalScanOperatorDescriptor dataScanner =
-                new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, adapterFactory);
+        ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, scannerDesc,
+                adapterFactory, tupleFilterFactory, outputLimit);
 
         AlgebricksPartitionConstraint constraint;
         try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index fae0d75..a5bc206 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -22,7 +22,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -37,7 +36,6 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -48,6 +46,7 @@ import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
 import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -233,8 +232,11 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
             cursor.next();
             matchingTupleCount++;
             ITupleReference tuple = cursor.getTuple();
-            if (tupleFilter != null && !tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
-                continue;
+            if (tupleFilter != null) {
+                referenceFilterTuple.reset(tuple);
+                if (!tupleFilter.accept(referenceFilterTuple)) {
+                    continue;
+                }
             }
             tb.reset();
 
@@ -388,52 +390,6 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
         }
     }
 
-    /**
-     * A wrapper class to wrap ITupleReference into IFrameTupleReference, as the latter
-     * is used by ITupleFilter
-     *
-     */
-    protected static class ReferenceFrameTupleReference implements IFrameTupleReference {
-        private ITupleReference tuple;
-
-        public IFrameTupleReference reset(ITupleReference tuple) {
-            this.tuple = tuple;
-            return this;
-        }
-
-        @Override
-        public int getFieldCount() {
-            return tuple.getFieldCount();
-        }
-
-        @Override
-        public byte[] getFieldData(int fIdx) {
-            return tuple.getFieldData(fIdx);
-        }
-
-        @Override
-        public int getFieldStart(int fIdx) {
-            return tuple.getFieldStart(fIdx);
-        }
-
-        @Override
-        public int getFieldLength(int fIdx) {
-            return tuple.getFieldLength(fIdx);
-        }
-
-        @Override
-        public IFrameTupleAccessor getFrameTupleAccessor() {
-            throw new UnsupportedOperationException(
-                    "getFrameTupleAccessor is not supported by ReferenceFrameTupleReference");
-        }
-
-        @Override
-        public int getTupleIndex() {
-            throw new UnsupportedOperationException("getTupleIndex is not supported by ReferenceFrameTupleReference");
-        }
-
-    }
-
     @Override
     public String getDisplayName() {
         return "Index Search";
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java
new file mode 100644
index 0000000..f7d8169
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/ReferenceFrameTupleReference.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.common.tuples;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * A wrapper class to wrap ITupleReference into IFrameTupleReference, as the latter
+ * is used by ITupleFilter
+ */
+public final class ReferenceFrameTupleReference implements IFrameTupleReference {
+
+    private ITupleReference tuple;
+
+    public void reset(ITupleReference tuple) {
+        this.tuple = tuple;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return tuple.getFieldCount();
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return tuple.getFieldData(fIdx);
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return tuple.getFieldStart(fIdx);
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return tuple.getFieldLength(fIdx);
+    }
+
+    @Override
+    public IFrameTupleAccessor getFrameTupleAccessor() {
+        throw new UnsupportedOperationException(
+                "getFrameTupleAccessor is not supported by ReferenceFrameTupleReference");
+    }
+
+    @Override
+    public int getTupleIndex() {
+        throw new UnsupportedOperationException("getTupleIndex is not supported by ReferenceFrameTupleReference");
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 596f4b0..dddaab1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -94,8 +94,11 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable extends BTreeSearchOpe
             cursor.next();
             matchingTupleCount++;
             ITupleReference tuple = cursor.getTuple();
-            if (tupleFilter != null && !tupleFilter.accept(referenceFilterTuple.reset(tuple))) {
-                continue;
+            if (tupleFilter != null) {
+                referenceFilterTuple.reset(tuple);
+                if (!tupleFilter.accept(referenceFilterTuple)) {
+                    continue;
+                }
             }
             tb.reset();