You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2020/09/03 23:47:51 UTC

[asterixdb] branch master updated: [ASTERIXDB-2771][*DB] Enabling LSM filters on datasets with meta records

This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ca7927f  [ASTERIXDB-2771][*DB] Enabling LSM filters on datasets with meta records
ca7927f is described below

commit ca7927f7393ad3593279939bd501c2baaa499e39
Author: Xikui Wang <xk...@gmail.com>
AuthorDate: Thu Sep 3 11:27:57 2020 -0700

    [ASTERIXDB-2771][*DB] Enabling LSM filters on datasets with meta records
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    This patch enables LSM filters on datasets with meta records. It
    introduces a filterSourceIndicator that indicates where the filter value
    comes from, (null - no filter, 0 - from the record, 1 - from the meta
    record). In LangExpressionToPlanTranslator, filter and meta handling
    are pushed into the translate(Insert/Upsert/Delete) methods separately.
    Currently, only UPSERTs are allowed on datasets with meta records, and
    only in this case, the filter value may come from the meta records.
    This patch also renamed an existing test case with where on change feed
    to avoid confuison. Legacy datasets without filterSourceIndicator will
    have 0 (pointing to record) by default.
    
    Change-Id: I6189169cafab9d99b8662ec91cbdd801cfae9dba
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7647
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
 .../rules/am/IntroduceLSMComponentFilterRule.java  |  67 ++++---
 .../translator/LangExpressionToPlanTranslator.java | 220 +++++++++++----------
 .../asterix/translator/util/ValidateUtil.java      |  15 +-
 .../asterix/app/translator/QueryTranslator.java    |   8 +-
 .../asterix/app/bootstrap/TestNodeController.java  |   4 +-
 .../dataflow/CheckpointInSecondaryIndexTest.java   |   6 +-
 .../dataflow/GlobalVirtualBufferCacheTest.java     |   8 +-
 .../test/dataflow/MultiPartitionLSMIndexTest.java  |   6 +-
 .../dataflow/SearchCursorComponentSwitchTest.java  |   6 +-
 .../asterix/test/dataflow/StorageTestUtils.java    |   2 +-
 .../apache/asterix/test/dataflow/TestDataset.java  |   4 +-
 .../storage/IndexDropOperatorNodePushableTest.java |   6 +-
 .../queries/filter_on_meta_0.sqlpp}                |  23 ++-
 .../queries/filter_on_meta_1.sqlpp}                |  23 ++-
 .../queries/filter_on_meta_2.sqlpp}                |  23 ++-
 .../queries/filter_on_meta_3.sqlpp}                |  24 ++-
 .../queries/filter_on_meta_4.sqlpp}                |  24 ++-
 .../queries/filter_on_meta_5.sqlpp}                |  25 ++-
 .../optimizerts/results/filter_on_meta_0.plan      |  11 ++
 .../optimizerts/results/filter_on_meta_1.plan      |  12 ++
 .../optimizerts/results/filter_on_meta_2.plan      |  12 ++
 .../optimizerts/results/filter_on_meta_3.plan      |  11 ++
 .../optimizerts/results/filter_on_meta_4.plan      |  12 ++
 .../optimizerts/results/filter_on_meta_5.plan      |  12 ++
 .../change-feed-filter-on-meta-dataset.1.ddl.sqlpp |  51 +++++
 ...nge-feed-filter-on-meta-dataset.2.update.sqlpp} |   6 +-
 ...ange-feed-filter-on-meta-dataset.3.query.sqlpp} |   4 +-
 ...change-feed-filter-on-meta-dataset.4.ddl.sqlpp} |   3 +-
 .../change-feed-with-where-on-meta.1.ddl.sqlpp}    |   0
 ...change-feed-with-where-on-meta.10.update.sqlpp} |   0
 .../change-feed-with-where-on-meta.11.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.12.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.13.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.14.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.15.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.16.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.17.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.18.ddl.sqlpp}   |   0
 .../change-feed-with-where-on-meta.2.update.sqlpp} |   0
 .../change-feed-with-where-on-meta.3.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.4.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.5.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.6.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.7.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.8.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.9.query.sqlpp}  |   0
 .../change-feed-filter-on-meta-dataset.1.adm       |   1 +
 .../change-feed-with-filter-on-meta.11.adm         |   0
 .../change-feed-with-filter-on-meta.12.adm         |   0
 .../change-feed-with-filter-on-meta.13.adm         |   0
 .../change-feed-with-filter-on-meta.14.adm         |   0
 .../change-feed-with-filter-on-meta.15.adm         |   0
 .../change-feed-with-filter-on-meta.16.adm         |   0
 .../change-feed-with-filter-on-meta.17.adm         |   0
 .../change-feed-with-filter-on-meta.3.adm          |   0
 .../change-feed-with-filter-on-meta.4.adm          |   0
 .../change-feed-with-filter-on-meta.5.adm          |   0
 .../change-feed-with-filter-on-meta.6.adm          |   0
 .../change-feed-with-filter-on-meta.7.adm          |   0
 .../change-feed-with-filter-on-meta.8.adm          |   0
 .../change-feed-with-filter-on-meta.9.adm          |   0
 .../test/resources/runtimets/testsuite_sqlpp.xml   |   9 +-
 .../asterix/common/exceptions/ErrorCode.java       |   1 +
 .../src/main/resources/asx_errormsg/en.properties  |   1 +
 asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj  |   5 +-
 .../lang/common/statement/InternalDetailsDecl.java |  15 +-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj    |   8 +-
 .../metadata/bootstrap/MetadataBootstrap.java      |   2 +-
 .../metadata/declared/MetadataProvider.java        |  13 +-
 .../apache/asterix/metadata/entities/Dataset.java  |   4 +-
 .../metadata/entities/InternalDatasetDetails.java  |  32 ++-
 .../DatasetTupleTranslator.java                    |  17 +-
 .../apache/asterix/metadata/utils/DatasetUtil.java |  29 ++-
 .../utils/SecondaryBTreeOperationsHelper.java      |   9 +-
 .../SecondaryCorrelatedBTreeOperationsHelper.java  |   9 +-
 .../utils/SecondaryIndexOperationsHelper.java      |   4 +-
 .../DatasetTupleTranslatorTest.java                |   2 +-
 .../IndexTupleTranslatorTest.java                  |   2 +-
 .../LSMPrimaryUpsertOperatorDescriptor.java        |  14 +-
 .../LSMPrimaryUpsertOperatorNodePushable.java      |  27 ++-
 80 files changed, 611 insertions(+), 221 deletions(-)

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
index 95b7e17..c49670e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
@@ -89,21 +89,26 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
 
         Dataset dataset = getDataset(op, context);
+        Integer filterSourceIndicator = null;
         List<String> filterFieldName = null;
-        ARecordType recType = null;
+        ARecordType itemType = null;
         MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
         if (dataset != null && dataset.getDatasetType() == DatasetType.INTERNAL) {
+            filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(dataset);
             filterFieldName = DatasetUtil.getFilterField(dataset);
-            IAType itemType = mp.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-            if (itemType.getTypeTag() == ATypeTag.OBJECT) {
-                recType = (ARecordType) itemType;
+            IAType filterSourceType = filterSourceIndicator == null || filterSourceIndicator == 0
+                    ? mp.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName())
+                    : mp.findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+
+            if (filterSourceType.getTypeTag() == ATypeTag.OBJECT) {
+                itemType = (ARecordType) filterSourceType;
             }
         }
-        if (filterFieldName == null || recType == null) {
+        if (filterFieldName == null || itemType == null) {
             return false;
         }
 
-        IAType filterType = recType.getSubFieldType(filterFieldName);
+        IAType filterType = itemType.getSubFieldType(filterFieldName);
 
         typeEnvironment = context.getOutputTypeEnvironment(op);
         ILogicalExpression condExpr = ((SelectOperator) op).getCondition().getValue();
@@ -116,10 +121,11 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
 
             for (int i = 0; i < analysisCtx.getMatchedFuncExprs().size(); i++) {
                 IOptimizableFuncExpr optFuncExpr = analysisCtx.getMatchedFuncExpr(i);
-                boolean found = findMacthedExprFieldName(optFuncExpr, op, dataset, recType, datasetIndexes, context);
+                boolean found = findMacthedExprFieldName(optFuncExpr, op, dataset, itemType, datasetIndexes, context,
+                        filterSourceIndicator);
                 // the field name source should be from the dataset record, i.e. source should be == 0
                 if (found && optFuncExpr.getFieldName(0).equals(filterFieldName)
-                        && optFuncExpr.getFieldSource(0) == 0) {
+                        && optFuncExpr.getFieldSource(0) == filterSourceIndicator) {
                     optFuncExprs.add(optFuncExpr);
                 }
             }
@@ -490,8 +496,8 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
     }
 
     private boolean findMacthedExprFieldName(IOptimizableFuncExpr optFuncExpr, AbstractLogicalOperator op,
-            Dataset dataset, ARecordType recType, List<Index> datasetIndexes, IOptimizationContext context)
-            throws AlgebricksException {
+            Dataset dataset, ARecordType filterSourceType, List<Index> datasetIndexes, IOptimizationContext context,
+            Integer filterSourceIndicator) throws AlgebricksException {
         AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         while (descendantOp != null) {
             if (descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
@@ -503,13 +509,16 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
                     if (funcVarIndex == -1) {
                         continue;
                     }
-                    // TODO(ali): this SQ NPE should be investigated
-                    List<String> fieldName =
-                            getFieldNameFromSubAssignTree(optFuncExpr, descendantOp, varIndex, recType).second;
-                    if (fieldName == null) {
+                    Pair<ARecordType, List<String>> fieldNamePairs =
+                            getFieldNameFromSubAssignTree(optFuncExpr, descendantOp, varIndex, filterSourceType,
+                                    filterSourceIndicator, dataset.getPrimaryKeys().size());
+                    if (fieldNamePairs == null) {
                         return false;
                     }
-                    optFuncExpr.setFieldName(funcVarIndex, fieldName, 0);
+                    List<String> fieldName = fieldNamePairs.second;
+                    // Since we validated the filter source in getFieldNameFromSubAssignTree, we can safely set the
+                    // fieldSource to be filterSourceIndicator
+                    optFuncExpr.setFieldName(funcVarIndex, fieldName, filterSourceIndicator);
                     return true;
                 }
             } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
@@ -564,7 +573,7 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
                     IAType metaItemType = ((MetadataProvider) context.getMetadataProvider())
                             .findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
                     ARecordType metaRecType = (ARecordType) metaItemType;
-                    int numSecondaryKeys = KeyFieldTypeUtil.getNumSecondaryKeys(index, recType, metaRecType);
+                    int numSecondaryKeys = KeyFieldTypeUtil.getNumSecondaryKeys(index, filterSourceType, metaRecType);
                     List<String> fieldName;
                     int keySource;
                     if (varIndex >= numSecondaryKeys) {
@@ -596,7 +605,8 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
     }
 
     private Pair<ARecordType, List<String>> getFieldNameFromSubAssignTree(IOptimizableFuncExpr optFuncExpr,
-            AbstractLogicalOperator op, int varIndex, ARecordType recType) {
+            AbstractLogicalOperator op, int varIndex, ARecordType filterSourceType, Integer filterSourceIndicator,
+            int numOfPKeys) {
         AbstractLogicalExpression expr = null;
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
@@ -619,8 +629,12 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
             for (int varCheck = 0; varCheck < op.getInputs().size(); varCheck++) {
                 AbstractLogicalOperator nestedOp = (AbstractLogicalOperator) op.getInputs().get(varCheck).getValue();
                 if (nestedOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
-                    if (varCheck == op.getInputs().size() - 1) {
-
+                    if (nestedOp.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
+                        return null;
+                    }
+                    List<LogicalVariable> scannedVars = ((DataSourceScanOperator) nestedOp).getScanVariables();
+                    if (scannedVars.indexOf(usedVar) != filterSourceIndicator + numOfPKeys) {
+                        return null;
                     }
                 } else {
                     int nestedAssignVar = ((AssignOperator) nestedOp).getVariables().indexOf(usedVar);
@@ -630,9 +644,10 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
                     //get the nested info from the lower input
                     Pair<ARecordType, List<String>> lowerInfo = getFieldNameFromSubAssignTree(optFuncExpr,
                             (AbstractLogicalOperator) op.getInputs().get(varCheck).getValue(), nestedAssignVar,
-                            recType);
+                            filterSourceType, filterSourceIndicator, numOfPKeys);
                     if (lowerInfo != null) {
-                        recType = lowerInfo.first;
+                        // propagate filterSourceType in case the filter value comes from a nested attribute.
+                        filterSourceType = lowerInfo.first;
                         returnList = lowerInfo.second;
                     }
                 }
@@ -644,18 +659,18 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
                     return null;
                 }
                 returnList.add(fieldName);
-                return new Pair<>(recType, returnList);
+                return new Pair<>(filterSourceType, returnList);
             } else if (funcIdent == BuiltinFunctions.FIELD_ACCESS_BY_INDEX) {
                 Integer fieldIndex = ConstantExpressionUtil.getIntArgument(funcExpr, 1);
                 if (fieldIndex == null) {
                     return null;
                 }
-                returnList.add(recType.getFieldNames()[fieldIndex]);
-                IAType subType = recType.getFieldTypes()[fieldIndex];
+                returnList.add(filterSourceType.getFieldNames()[fieldIndex]);
+                IAType subType = filterSourceType.getFieldTypes()[fieldIndex];
                 if (subType.getTypeTag() == ATypeTag.OBJECT) {
-                    recType = (ARecordType) subType;
+                    filterSourceType = (ARecordType) subType;
                 }
-                return new Pair<>(recType, returnList);
+                return new Pair<>(filterSourceType, returnList);
             }
 
         }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 039b8db..fd4fd90 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -262,6 +262,7 @@ abstract class LangExpressionToPlanTranslator
             assign.setExplicitOrderingProperty(new LocalOrderProperty(orderColumns));
         }
 
+        // Load does not support meta record now.
         List<String> additionalFilteringField = DatasetUtil.getFilterField(targetDatasource.getDataset());
         List<LogicalVariable> additionalFilteringVars;
         List<Mutable<ILogicalExpression>> additionalFilteringAssignExpressions;
@@ -365,67 +366,52 @@ abstract class LangExpressionToPlanTranslator
             topOp.getInputs().get(0).setValue(assignCollectionToSequence);
             ProjectOperator projectOperator = (ProjectOperator) topOp;
             projectOperator.getVariables().set(0, seqVar);
-            resVar = seqVar;
+
             DatasetDataSource targetDatasource =
                     validateDatasetInfo(metadataProvider, stmt.getDataverseName(), stmt.getDatasetName(), sourceLoc);
             List<Integer> keySourceIndicator =
                     ((InternalDatasetDetails) targetDatasource.getDataset().getDatasetDetails())
                             .getKeySourceIndicator();
-            ArrayList<LogicalVariable> vars = new ArrayList<>();
-            ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<>();
+            ArrayList<LogicalVariable> pkeyVars = new ArrayList<>();
+            ArrayList<Mutable<ILogicalExpression>> pkeyExprs = new ArrayList<>();
             List<Mutable<ILogicalExpression>> varRefsForLoading = new ArrayList<>();
             List<List<String>> partitionKeys = targetDatasource.getDataset().getPrimaryKeys();
             int numOfPrimaryKeys = partitionKeys.size();
             for (int i = 0; i < numOfPrimaryKeys; i++) {
                 if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
                     // record part
-                    PlanTranslationUtil.prepareVarAndExpression(partitionKeys.get(i), resVar, vars, exprs,
+                    PlanTranslationUtil.prepareVarAndExpression(partitionKeys.get(i), seqVar, pkeyVars, pkeyExprs,
                             varRefsForLoading, context, sourceLoc);
                 } else {
                     // meta part
-                    PlanTranslationUtil.prepareMetaKeyAccessExpression(partitionKeys.get(i), unnestVar, exprs, vars,
-                            varRefsForLoading, context, sourceLoc);
+                    PlanTranslationUtil.prepareMetaKeyAccessExpression(partitionKeys.get(i), unnestVar, pkeyExprs,
+                            pkeyVars, varRefsForLoading, context, sourceLoc);
                 }
             }
 
-            AssignOperator assign = new AssignOperator(vars, exprs);
-            assign.setSourceLocation(sourceLoc);
-            List<String> additionalFilteringField = DatasetUtil.getFilterField(targetDatasource.getDataset());
-            List<LogicalVariable> additionalFilteringVars;
-            List<Mutable<ILogicalExpression>> additionalFilteringAssignExpressions;
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions = null;
-            AssignOperator additionalFilteringAssign = null;
-            if (additionalFilteringField != null) {
-                additionalFilteringVars = new ArrayList<>();
-                additionalFilteringAssignExpressions = new ArrayList<>();
-                additionalFilteringExpressions = new ArrayList<>();
-
-                PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, resVar, additionalFilteringVars,
-                        additionalFilteringAssignExpressions, additionalFilteringExpressions, context, sourceLoc);
-
-                additionalFilteringAssign =
-                        new AssignOperator(additionalFilteringVars, additionalFilteringAssignExpressions);
-                additionalFilteringAssign.getInputs().add(new MutableObject<>(topOp));
-                additionalFilteringAssign.setSourceLocation(sourceLoc);
-                assign.getInputs().add(new MutableObject<>(additionalFilteringAssign));
-            } else {
-                assign.getInputs().add(new MutableObject<>(topOp));
-            }
+            AssignOperator pkeyAssignOp = new AssignOperator(pkeyVars, pkeyExprs);
+            pkeyAssignOp.setSourceLocation(sourceLoc);
+            pkeyAssignOp.getInputs().add(new MutableObject<>(topOp));
+
+            // the filters and metas could be handled here once we have unified processing for metas in
+            // all insert/upsert/delete
 
+            VariableReferenceExpression seqVarRef = new VariableReferenceExpression(seqVar);
+            seqVarRef.setSourceLocation(sourceLoc);
+            Mutable<ILogicalExpression> seqRef = new MutableObject<>(seqVarRef);
             ILogicalOperator leafOperator;
             switch (stmt.getKind()) {
                 case INSERT:
-                    leafOperator = translateInsert(targetDatasource, resVar, varRefsForLoading,
-                            additionalFilteringExpressions, assign, stmt, resultMetadata);
+                    leafOperator = translateInsert(targetDatasource, seqRef, varRefsForLoading, seqVar, pkeyAssignOp,
+                            stmt, resultMetadata);
                     break;
                 case UPSERT:
-                    leafOperator = translateUpsert(targetDatasource, resVar, varRefsForLoading,
-                            additionalFilteringExpressions, assign, additionalFilteringField, unnestVar, topOp, exprs,
-                            additionalFilteringAssign, stmt, resultMetadata);
+                    leafOperator = translateUpsert(targetDatasource, seqRef, varRefsForLoading, pkeyAssignOp, unnestVar,
+                            topOp, pkeyExprs, seqVar, stmt, resultMetadata);
                     break;
                 case DELETE:
-                    leafOperator = translateDelete(targetDatasource, resVar, varRefsForLoading,
-                            additionalFilteringExpressions, assign, stmt);
+                    leafOperator =
+                            translateDelete(targetDatasource, seqRef, varRefsForLoading, seqVar, pkeyAssignOp, stmt);
                     break;
                 default:
                     throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -439,9 +425,8 @@ abstract class LangExpressionToPlanTranslator
         return plan;
     }
 
-    protected ILogicalOperator translateDelete(DatasetDataSource targetDatasource, LogicalVariable resVar,
-            List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
+    private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading, LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
             ICompiledDmlStatement stmt) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (targetDatasource.getDataset().hasMetaPart()) {
@@ -449,12 +434,19 @@ abstract class LangExpressionToPlanTranslator
                     targetDatasource.getDataset().getDatasetName()
                             + ": delete from dataset is not supported on Datasets with Meta records");
         }
-        VariableReferenceExpression varRef = new VariableReferenceExpression(resVar);
-        varRef.setSourceLocation(stmt.getSourceLocation());
-        InsertDeleteUpsertOperator deleteOp = new InsertDeleteUpsertOperator(targetDatasource,
-                new MutableObject<>(varRef), varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
-        deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        deleteOp.getInputs().add(new MutableObject<>(assign));
+
+        List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+        List<Mutable<ILogicalExpression>> filterExprs = null;
+
+        // currently, meta-datasets cannot be inserted.
+        if (filterField != null) {
+            filterExprs = generatedFilterExprs(pkeyAssignOp, filterField, seqVar, sourceLoc);
+        }
+
+        InsertDeleteUpsertOperator deleteOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
+                varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
+        deleteOp.setAdditionalFilteringExpressions(filterExprs);
+        deleteOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
         deleteOp.setSourceLocation(sourceLoc);
         DelegateOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(deleteOp));
@@ -462,12 +454,11 @@ abstract class LangExpressionToPlanTranslator
         return leafOperator;
     }
 
-    protected ILogicalOperator translateUpsert(DatasetDataSource targetDatasource, LogicalVariable resVar,
-            List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
-            List<String> additionalFilteringField, LogicalVariable unnestVar, ILogicalOperator topOp,
-            List<Mutable<ILogicalExpression>> exprs, AssignOperator additionalFilteringAssign,
-            ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
+    private ILogicalOperator translateUpsert(DatasetDataSource targetDatasource,
+            Mutable<ILogicalExpression> payloadVarRef, List<Mutable<ILogicalExpression>> varRefsForLoading,
+            ILogicalOperator pkeyAssignOp, LogicalVariable unnestVar, ILogicalOperator topOp,
+            List<Mutable<ILogicalExpression>> pkeyExprs, LogicalVariable seqVar, ICompiledDmlStatement stmt,
+            IResultMetadata resultMetadata) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (!targetDatasource.getDataset().allow(topOp, DatasetUtil.OP_UPSERT)) {
             throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -479,15 +470,15 @@ abstract class LangExpressionToPlanTranslator
         Expression returnExpression = compiledUpsert.getReturnExpression();
         InsertDeleteUpsertOperator upsertOp;
         ILogicalOperator rootOperator;
+        List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+
         if (targetDatasource.getDataset().hasMetaPart()) {
             if (returnExpression != null) {
                 throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
                         "Returning not allowed on datasets with Meta records");
             }
-            AssignOperator metaAndKeysAssign;
             List<LogicalVariable> metaAndKeysVars;
             List<Mutable<ILogicalExpression>> metaAndKeysExprs;
-            List<Mutable<ILogicalExpression>> metaExpSingletonList;
             metaAndKeysVars = new ArrayList<>();
             metaAndKeysExprs = new ArrayList<>();
             // add the meta function
@@ -499,18 +490,16 @@ abstract class LangExpressionToPlanTranslator
             metaFunction.setSourceLocation(sourceLoc);
             // create assign for the meta part
             LogicalVariable metaVar = context.newVar();
-            metaExpSingletonList = new ArrayList<>(1);
             VariableReferenceExpression metaVarRef = new VariableReferenceExpression(metaVar);
             metaVarRef.setSourceLocation(sourceLoc);
-            metaExpSingletonList.add(new MutableObject<>(metaVarRef));
             metaAndKeysVars.add(metaVar);
             metaAndKeysExprs.add(new MutableObject<>(metaFunction));
             project.getVariables().add(metaVar);
             varRefsForLoading.clear();
-            for (Mutable<ILogicalExpression> assignExpr : exprs) {
+            for (Mutable<ILogicalExpression> assignExpr : pkeyExprs) {
                 if (assignExpr.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                     AbstractFunctionCallExpression funcCall = (AbstractFunctionCallExpression) assignExpr.getValue();
-                    funcCall.substituteVar(resVar, unnestVar);
+                    funcCall.substituteVar(seqVar, unnestVar);
                     LogicalVariable pkVar = context.newVar();
                     metaAndKeysVars.add(pkVar);
                     metaAndKeysExprs.add(new MutableObject<>(assignExpr.getValue()));
@@ -519,58 +508,68 @@ abstract class LangExpressionToPlanTranslator
                 }
             }
             // A change feed, we don't need the assign to access PKs
-            VariableReferenceExpression varRef = new VariableReferenceExpression(resVar);
-            varRef.setSourceLocation(stmt.getSourceLocation());
-            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, new MutableObject<>(varRef), varRefsForLoading,
-                    metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading,
+                    Collections.singletonList(new MutableObject<>(metaVarRef)), InsertDeleteUpsertOperator.Kind.UPSERT,
+                    false);
             upsertOp.setUpsertIndicatorVar(context.newVar());
             upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
             // Create and add a new variable used for representing the original record
             upsertOp.setPrevRecordVar(context.newVar());
             upsertOp.setPrevRecordType(targetDatasource.getItemType());
             upsertOp.setSourceLocation(sourceLoc);
-            if (targetDatasource.getDataset().hasMetaPart()) {
-                List<LogicalVariable> metaVars = new ArrayList<>();
-                metaVars.add(context.newVar());
-                upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
-                List<Object> metaTypes = new ArrayList<>();
-                metaTypes.add(targetDatasource.getMetaItemType());
-                upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
-            }
 
-            if (additionalFilteringField != null) {
-                upsertOp.setPrevFilterVar(context.newVar());
-                upsertOp.setPrevFilterType(
-                        ((ARecordType) targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0)));
-                additionalFilteringAssign.getInputs().clear();
-                additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
-                upsertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
-            } else {
-                upsertOp.getInputs().add(assign.getInputs().get(0));
-            }
-            metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
+            List<LogicalVariable> metaVars = new ArrayList<>();
+            metaVars.add(context.newVar());
+            upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
+            List<Object> metaTypes = new ArrayList<>();
+            metaTypes.add(targetDatasource.getMetaItemType());
+            upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+
+            // insert meta key assign before project
+            AssignOperator metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
             metaAndKeysAssign.getInputs().add(topOp.getInputs().get(0));
             metaAndKeysAssign.setSourceLocation(sourceLoc);
             topOp.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
-            upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+
+            // insert filter assign
+            if (filterField != null) {
+                LogicalVariable filterSourceVar =
+                        DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset()) == 0 ? seqVar : metaVar;
+                ARecordType filterSourceType = DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset()) == 0
+                        ? (ARecordType) targetDatasource.getItemType()
+                        : (ARecordType) targetDatasource.getMetaItemType();
+
+                List<Mutable<ILogicalExpression>> filterExprs =
+                        generatedFilterExprs(pkeyAssignOp, filterField, filterSourceVar, sourceLoc);
+
+                upsertOp.setPrevFilterVar(context.newVar());
+                upsertOp.setPrevFilterType(filterSourceType.getFieldType(filterField.get(0)));
+                upsertOp.setAdditionalFilteringExpressions(filterExprs);
+                upsertOp.getInputs().add(pkeyAssignOp.getInputs().get(0));
+            } else {
+                upsertOp.getInputs().add(new MutableObject<>(topOp));
+                upsertOp.setAdditionalFilteringExpressions(null);
+            }
         } else {
-            VariableReferenceExpression varRef = new VariableReferenceExpression(resVar);
-            varRef.setSourceLocation(stmt.getSourceLocation());
-            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, new MutableObject<>(varRef), varRefsForLoading,
+            ARecordType recordType = (ARecordType) targetDatasource.getItemType();
+            List<Mutable<ILogicalExpression>> filterExprs = null;
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading,
                     InsertDeleteUpsertOperator.Kind.UPSERT, false);
-            upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-            upsertOp.getInputs().add(new MutableObject<>(assign));
+
+            if (filterField != null) {
+                // add filter assign
+                filterExprs = generatedFilterExprs(pkeyAssignOp, filterField, seqVar, sourceLoc);
+                upsertOp.setPrevFilterVar(context.newVar());
+                upsertOp.setPrevFilterType(recordType.getFieldType(filterField.get(0)));
+            }
+            upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
+            upsertOp.setAdditionalFilteringExpressions(filterExprs);
             upsertOp.setSourceLocation(sourceLoc);
             upsertOp.setUpsertIndicatorVar(context.newVar());
             upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
             // Create and add a new variable used for representing the original record
-            ARecordType recordType = (ARecordType) targetDatasource.getItemType();
             upsertOp.setPrevRecordVar(context.newVar());
             upsertOp.setPrevRecordType(recordType);
-            if (additionalFilteringField != null) {
-                upsertOp.setPrevFilterVar(context.newVar());
-                upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
-            }
         }
         DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(returnExpression == null));
         delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
@@ -581,9 +580,8 @@ abstract class LangExpressionToPlanTranslator
         return processReturningExpression(rootOperator, upsertOp, compiledUpsert, resultMetadata);
     }
 
-    protected ILogicalOperator translateInsert(DatasetDataSource targetDatasource, LogicalVariable resVar,
-            List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
+    private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading, LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
             ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (targetDatasource.getDataset().hasMetaPart()) {
@@ -591,13 +589,20 @@ abstract class LangExpressionToPlanTranslator
                     targetDatasource.getDataset().getDatasetName()
                             + ": insert into dataset is not supported on Datasets with Meta records");
         }
+
+        List<String> filterField = DatasetUtil.getFilterField(targetDatasource.getDataset());
+        List<Mutable<ILogicalExpression>> filterExprs = null;
+
+        // currently, meta-datasets cannot be inserted.
+        if (filterField != null) {
+            filterExprs = generatedFilterExprs(pkeyAssignOp, filterField, seqVar, sourceLoc);
+        }
+
         // Adds the insert operator.
-        VariableReferenceExpression varRef = new VariableReferenceExpression(resVar);
-        varRef.setSourceLocation(stmt.getSourceLocation());
-        InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource,
-                new MutableObject<>(varRef), varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
-        insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        insertOp.getInputs().add(new MutableObject<>(assign));
+        InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
+                varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
+        insertOp.setAdditionalFilteringExpressions(filterExprs);
+        insertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
         insertOp.setSourceLocation(sourceLoc);
 
         // Adds the commit operator.
@@ -611,6 +616,21 @@ abstract class LangExpressionToPlanTranslator
         return processReturningExpression(rootOperator, insertOp, compiledInsert, resultMetadata);
     }
 
+    private List<Mutable<ILogicalExpression>> generatedFilterExprs(ILogicalOperator pkeyAssignOp,
+            List<String> filterField, LogicalVariable seqVar, SourceLocation sourceLoc) {
+        List<LogicalVariable> filterVars = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> filterAssignExprs = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> filterExprs = new ArrayList<>();
+
+        PlanTranslationUtil.prepareVarAndExpression(filterField, seqVar, filterVars, filterAssignExprs, filterExprs,
+                context, sourceLoc);
+        AssignOperator additionalFilteringAssign = new AssignOperator(filterVars, filterAssignExprs);
+        additionalFilteringAssign.getInputs().add(pkeyAssignOp.getInputs().get(0));
+        additionalFilteringAssign.setSourceLocation(sourceLoc);
+        pkeyAssignOp.getInputs().set(0, new MutableObject<>(additionalFilteringAssign));
+        return filterExprs;
+    }
+
     // Stitches the translated operators for the returning expression into the query
     // plan.
     protected ILogicalOperator processReturningExpression(ILogicalOperator inputOperator,
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
index e587e70..2869313 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
@@ -45,10 +45,14 @@ public class ValidateUtil {
     /**
      * Validates the field that will be used as filter for the components of an LSM index.
      *
-     * @param dataset
-     *            the dataset
      * @param recordType
      *            the record type
+     * @param metaType
+     *            the meta record type
+     * @param filterSourceIndicator
+     *            indicates where the filter attribute comes from, 0 for record, 1 for meta record.
+     *            since this method is called only when a filter field presents, filterSourceIndicator will not be null
+     *
      * @param filterField
      *            the full name of the field
      * @param sourceLoc
@@ -57,9 +61,10 @@ public class ValidateUtil {
      *             if field type can't be a filter type.
      *             if field type is nullable.
      */
-    public static void validateFilterField(ARecordType recordType, List<String> filterField, SourceLocation sourceLoc)
-            throws AlgebricksException {
-        IAType fieldType = recordType.getSubFieldType(filterField);
+    public static void validateFilterField(ARecordType recordType, ARecordType metaType, Integer filterSourceIndicator,
+            List<String> filterField, SourceLocation sourceLoc) throws AlgebricksException {
+        ARecordType itemType = filterSourceIndicator == 0 ? recordType : metaType;
+        IAType fieldType = itemType.getSubFieldType(filterField);
         if (fieldType == null) {
             throw new CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND, sourceLoc,
                     RecordUtil.toFullyQualifiedName(filterField));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index dd1f193..9a8895b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -769,8 +769,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                             metaRecType, partitioningExprs, keySourceIndicators, autogenerated, sourceLoc);
 
                     List<String> filterField = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterField();
+                    Integer filterSourceIndicator =
+                            ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getFilterSourceIndicator();
+
                     if (filterField != null) {
-                        ValidateUtil.validateFilterField(aRecordType, filterField, sourceLoc);
+                        ValidateUtil.validateFilterField(aRecordType, metaRecType, filterSourceIndicator, filterField,
+                                sourceLoc);
                     }
                     if (compactionPolicy == null && filterField != null) {
                         // If the dataset has a filter and the user didn't specify a merge
@@ -781,7 +785,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     }
                     datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
                             InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-                            keySourceIndicators, partitioningTypes, autogenerated, filterField);
+                            keySourceIndicators, partitioningTypes, autogenerated, filterSourceIndicator, filterField);
                     break;
                 case EXTERNAL:
                     ExternalDetailsDecl externalDetails = (ExternalDetailsDecl) dd.getDatasetDetailsDecl();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index ebf98df..1040781 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -800,8 +800,8 @@ public class TestNodeController {
                 new LSMPrimaryUpsertOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
                         indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations,
                         recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0),
-                        modificationCallbackFactory, searchCallbackFactory,
-                        keyIndexes.length, recordType, -1, frameOpCallbackFactory == null
+                        modificationCallbackFactory, searchCallbackFactory, keyIndexes.length,
+                        0, recordType, -1, frameOpCallbackFactory == null
                                 ? dataset.getFrameOpCallbackFactory(mdProvider) : frameOpCallbackFactory,
                         MissingWriterFactory.INSTANCE, hasSecondaries);
         RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
index b43c445..d60702f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
@@ -165,9 +165,9 @@ public class CheckpointInSecondaryIndexTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
-                        PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, null),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
         secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, INDEX_TYPE, INDEX_FIELD_NAMES,
                 INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, false, 0);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
index 0e51f28..2034aa8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -189,16 +189,16 @@ public class GlobalVirtualBufferCacheTest {
 
     private void createIndex() throws Exception {
         dataset = new TestDataset(StorageTestUtils.DATAVERSE_NAME, "ds", StorageTestUtils.DATAVERSE_NAME,
-                StorageTestUtils.DATA_TYPE_NAME, StorageTestUtils.NODE_GROUP_NAME,
-                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                        StorageTestUtils.PARTITIONING_KEYS, null, null, null, false, null),
+                StorageTestUtils.DATA_TYPE_NAME, StorageTestUtils.NODE_GROUP_NAME, NoMergePolicyFactory.NAME,
+                null, new InternalDatasetDetails(null, PartitioningStrategy.HASH, StorageTestUtils.PARTITIONING_KEYS,
+                        null, null, null, false, null, null),
                 null, DatasetType.INTERNAL, StorageTestUtils.DATASET_ID, 0);
 
         filteredDataset = new TestDataset(StorageTestUtils.DATAVERSE_NAME, "filtered_ds",
                 StorageTestUtils.DATAVERSE_NAME, StorageTestUtils.DATA_TYPE_NAME, StorageTestUtils.NODE_GROUP_NAME,
                 NoMergePolicyFactory.NAME, null,
                 new InternalDatasetDetails(null, PartitioningStrategy.HASH, StorageTestUtils.PARTITIONING_KEYS, null,
-                        null, null, false, Collections.singletonList("value")),
+                        null, null, false, 0, Collections.singletonList("value")),
                 null, DatasetType.INTERNAL, StorageTestUtils.DATASET_ID + 1, 0);
 
         primaryIndexInfos = new PrimaryIndexInfo[NUM_PARTITIONS];
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 67c291f..41b889d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -155,9 +155,9 @@ public class MultiPartitionLSMIndexTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
-                        PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, null),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
         secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, INDEX_TYPE, INDEX_FIELD_NAMES,
                 INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, false, 0);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 28da85c..a9e86cb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -124,9 +124,9 @@ public class SearchCursorComponentSwitchTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
-                        PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, null),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
         PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
                 storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index e41c193..3ef1e62 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -95,7 +95,7 @@ public class StorageTestUtils {
     public static final TestDataset DATASET =
             new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
                     NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                            PARTITIONING_KEYS, null, null, null, false, null),
+                            PARTITIONING_KEYS, null, null, null, false, null, null),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
 
     private StorageTestUtils() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index a6a53cb..f28e9bb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -71,9 +71,9 @@ public class TestDataset extends Dataset {
     public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType,
             ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties)
             throws AlgebricksException {
-        ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(this, recordType);
+        ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(this, recordType, metaType);
         IBinaryComparatorFactory[] filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(this,
-                recordType, mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
+                recordType, metaType, mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
         IResourceFactory resourceFactory =
                 TestLsmBTreeResourceFactoryProvider.INSTANCE.getResourceFactory(mdProvider, this, index, recordType,
                         metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index f7f7207..d08a960 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -105,9 +105,9 @@ public class IndexDropOperatorNodePushableTest {
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
             Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                    NoMergePolicyFactory.NAME,
-                    null, new InternalDatasetDetails(null, InternalDatasetDetails.PartitioningStrategy.HASH,
-                            partitioningKeys, null, null, null, false, null),
+                    NoMergePolicyFactory.NAME, null,
+                    new InternalDatasetDetails(null, InternalDatasetDetails.PartitioningStrategy.HASH, partitioningKeys,
+                            null, null, null, false, null, null),
                     null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID, 0);
             // create dataset
             TestNodeController.PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_0.sqlpp
similarity index 63%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_0.sqlpp
index 6a6c12e..33d826b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_0.sqlpp
@@ -16,7 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on record
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key meta().`key` with filter on attr;
+
+select * from KVStore WHERE attr > 10;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_1.sqlpp
similarity index 62%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_1.sqlpp
index 6a6c12e..3a5743c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_1.sqlpp
@@ -16,7 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on meta
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key meta().`key` with filter on meta().seq;
+
+select * from KVStore WHERE meta().seq > 11;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_2.sqlpp
similarity index 61%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_2.sqlpp
index 6a6c12e..415e1a9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_2.sqlpp
@@ -16,7 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta key and query on meta key
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key meta().`key` with filter on meta().`key`;
+
+select k, meta() as meta from KVStore k WHERE meta().`key` > 12;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_3.sqlpp
similarity index 62%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_3.sqlpp
index 6a6c12e..4cca505 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_3.sqlpp
@@ -16,7 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on record
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32,
+  attr: int64
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key meta().`key` with filter on attr;
+
+select * from KVStore WHERE attr > 10;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_4.sqlpp
similarity index 61%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_4.sqlpp
index 6a6c12e..cfd4363 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_4.sqlpp
@@ -16,7 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on meta
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32,
+  attr: int64
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key meta().`key` with filter on meta().attr;
+
+select * from KVStore WHERE meta().attr > 10;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_5.sqlpp
similarity index 59%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_5.sqlpp
index 6a6c12e..0383e88 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_5.sqlpp
@@ -16,7 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on meta with a composite primary key
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr1: int64,
+  attr2: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32,
+  attr: int64
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key attr1, attr2 with filter on meta().attr;
+
+select * from KVStore WHERE meta().attr > 10;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_0.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_0.plan
new file mode 100644
index 0000000..fbdf7a6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_0.plan
@@ -0,0 +1,11 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |PARTITIONED|
+          -- STREAM_PROJECT  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_1.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_1.plan
new file mode 100644
index 0000000..69abc9a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_1.plan
@@ -0,0 +1,12 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_2.plan
new file mode 100644
index 0000000..69abc9a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_2.plan
@@ -0,0 +1,12 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_3.plan
new file mode 100644
index 0000000..fbdf7a6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_3.plan
@@ -0,0 +1,11 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |PARTITIONED|
+          -- STREAM_PROJECT  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_4.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_4.plan
new file mode 100644
index 0000000..69abc9a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_4.plan
@@ -0,0 +1,12 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_5.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_5.plan
new file mode 100644
index 0000000..69abc9a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_5.plan
@@ -0,0 +1,12 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.ddl.sqlpp
new file mode 100644
index 0000000..fbcc7db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.ddl.sqlpp
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"; you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  seq:int64,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key meta().`key` with filter on meta().seq;
+
+create feed KVChangeStream with {
+ "adapter-name" : "adapter",
+  "type-name" : "DocumentType",
+  "meta-type-name" : "KVMetaType",
+  "reader" : "org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory",
+  "parser" : "record-with-metadata",
+  "format" : "dcp",
+  "record-format" : "json",
+  "change-feed" : "true",
+  "key-indexes" : "0",
+  "key-indicators" : "1",
+  "num-of-records" : "1000"
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.2.update.sqlpp
similarity index 86%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.2.update.sqlpp
index 459bb88..76ad55b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.2.update.sqlpp
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use KeyVerse;
 
-use test;
+set `wait-for-completion-feed` "true";
+connect feed KVChangeStream to dataset KVStore;
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from UK_DS1 c)} from UK_DS1 v order by v.id;
\ No newline at end of file
+start feed KVChangeStream;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.3.query.sqlpp
similarity index 92%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.3.query.sqlpp
index f12a2b7..61c1d45 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.3.query.sqlpp
@@ -16,5 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use KeyVerse;
 
-drop dataverse test;
\ No newline at end of file
+select k, meta() as meta from KVStore k
+where k.id = 5;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.4.ddl.sqlpp
similarity index 97%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.4.ddl.sqlpp
index f12a2b7..46a5fad 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.4.ddl.sqlpp
@@ -16,5 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-drop dataverse test;
\ No newline at end of file
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.1.ddl.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.1.ddl.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.1.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.10.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.10.update.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.10.update.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.10.update.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.11.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.11.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.11.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.12.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.12.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.12.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.13.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.13.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.13.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.13.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.14.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.14.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.15.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.15.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.15.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.15.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.16.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.16.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.16.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.16.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.17.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.17.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.17.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.17.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.18.ddl.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.18.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.2.update.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.2.update.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.2.update.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.3.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.3.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.3.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.4.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.4.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.5.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.5.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.5.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.6.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.6.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.6.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.7.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.7.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.7.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.8.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.8.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.8.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.8.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.9.query.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.9.query.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.9.query.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.adm
new file mode 100644
index 0000000..fa522e6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.adm
@@ -0,0 +1 @@
+{ "k": { "id": 5, "name": "Ian Maxon", "exp": 15 }, "meta": { "key": "8-2", "vbucket": 8, "seq": 21, "cas": 5, "expiration": 8004, "flags": 0, "revSeq": 0, "lockTime": 163 } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.11.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.11.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.11.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.12.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.12.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.12.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.12.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.13.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.13.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.13.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.13.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.14.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.14.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.15.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.15.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.15.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.15.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.16.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.16.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.16.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.16.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.17.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.17.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.17.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.17.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.3.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.3.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.3.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.4.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.4.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.5.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.5.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.5.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.6.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.6.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.6.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.7.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.7.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.7.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.8.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.8.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.8.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.9.adm
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.9.adm
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.9.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 9e83b57..91abbea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -12342,6 +12342,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-filter-on-meta-dataset">
+        <output-dir compare="Text">change-feed-filter-on-meta-dataset</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
       <compilation-unit name="change-feed-with-meta-pk-index">
         <output-dir compare="Text">change-feed-with-meta-pk-index</output-dir>
       </compilation-unit>
@@ -12533,8 +12538,8 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
-      <compilation-unit name="change-feed-with-filter-on-meta">
-        <output-dir compare="Text">change-feed-with-filter-on-meta</output-dir>
+      <compilation-unit name="change-feed-with-where-on-meta">
+        <output-dir compare="Text">change-feed-with-where-on-meta</output-dir>
       </compilation-unit>
     </test-case>
   </test-group>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 00c7210..5208b5d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -218,6 +218,7 @@ public class ErrorCode {
     public static final int UNKNOWN_ADAPTER = 1125;
     public static final int INVALID_EXTERNAL_IDENTIFIER_SIZE = 1126;
     public static final int UNSUPPORTED_ADAPTER_LANGUAGE = 1127;
+    public static final int INCONSISTENT_FILTER_INDICATOR = 1128;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 8e761d9..07a496c 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -213,6 +213,7 @@
 1125 = Cannot find adapter with name %1$s
 1126 = Invalid number of elements in external identifier. Expected %1$s elements for %2$s language
 1127 = Unsupported adapter language: %1$s
+1128 = Filter field is not defined properly
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 00d183f..e903977 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -561,11 +561,12 @@ DatasetDecl DatasetSpecification() throws ParseException:
         if(filterField!=null && filterField.first!=0){
           throw new ParseException("A filter field can only be a field in the main record of the dataset.");
         }
-        InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second,
+        try{
+          InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second,
                                                           primaryKeyFields.first,
                                                           autogenerated,
+                                                          filterField == null? null : filterField.first,
                                                           filterField == null? null : filterField.second);
-        try{
           dsetDecl = new DatasetDecl(nameComponents.first,
                                    nameComponents.second,
                                    new TypeReferenceExpression(typeComponents),
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InternalDetailsDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InternalDetailsDecl.java
index f312ddd..3ddd261 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InternalDetailsDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InternalDetailsDecl.java
@@ -20,18 +20,28 @@ package org.apache.asterix.lang.common.statement;
 
 import java.util.List;
 
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+
 public class InternalDetailsDecl implements IDatasetDetailsDecl {
     private final List<List<String>> partitioningExprs;
     private final List<Integer> keySourceIndicators;
+    private final Integer filterSourceIndicator;
     private final boolean autogenerated;
     private final List<String> filterField;
 
     public InternalDetailsDecl(List<List<String>> partitioningExpr, List<Integer> keySourceIndicators,
-            boolean autogenerated, List<String> filterField) {
+            boolean autogenerated, Integer filterSourceIndicator, List<String> filterField)
+            throws CompilationException {
         this.partitioningExprs = partitioningExpr;
         this.keySourceIndicators = keySourceIndicators;
         this.autogenerated = autogenerated;
+        if (filterSourceIndicator == null && filterField != null
+                || filterSourceIndicator != null && filterField == null) {
+            throw new CompilationException(ErrorCode.INCONSISTENT_FILTER_INDICATOR);
+        }
         this.filterField = filterField;
+        this.filterSourceIndicator = filterSourceIndicator;
     }
 
     public List<List<String>> getPartitioningExprs() {
@@ -50,4 +60,7 @@ public class InternalDetailsDecl implements IDatasetDetailsDecl {
         return filterField;
     }
 
+    public Integer getFilterSourceIndicator() {
+        return filterSourceIndicator;
+    }
 }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 201ff7d..347dc18 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -835,13 +835,9 @@ DatasetDecl DatasetSpecification(Token startStmtToken) throws ParseException:
   ( LOOKAHEAD(2) <WITH> <FILTER> <ON>  filterField = NestedField() )?
   ( <WITH> withRecord = RecordConstructor() )?
   {
-    if(filterField!=null && filterField.first!=0){
-      throw new SqlppParseException(getSourceLocation(startStmtToken),
-        "A filter field can only be a field in the main record of the dataset.");
-    }
-    InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second, primaryKeyFields.first, autogenerated,
-      filterField == null? null : filterField.second);
     try {
+      InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second, primaryKeyFields.first, autogenerated,
+        filterField == null? null : filterField.first, filterField == null? null : filterField.second);
       stmt = new DatasetDecl(nameComponents.first, nameComponents.second, typeExpr, metaTypeExpr, hints,
         DatasetType.INTERNAL, idd, withRecord, ifNotExists);
       return addSourceLocation(stmt, startStmtToken);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 4a64b8a..cf089fa 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -211,7 +211,7 @@ public class MetadataBootstrap {
         for (int i = 0; i < indexes.length; i++) {
             IDatasetDetails id = new InternalDatasetDetails(FileStructure.BTREE, PartitioningStrategy.HASH,
                     indexes[i].getPartitioningExpr(), indexes[i].getPartitioningExpr(), null,
-                    indexes[i].getPartitioningExprType(), false, null);
+                    indexes[i].getPartitioningExprType(), false, null, null);
             MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                     new Dataset(indexes[i].getDataverseName(), indexes[i].getIndexedDatasetName(),
                             indexes[i].getDataverseName(), indexes[i].getPayloadRecordType().getTypeName(),
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 4c19b61..4e721cb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -965,18 +965,21 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         }
         // set the record permutation
         fieldPermutation[i++] = inputSchema.findVariable(payload);
-        // set the filters' permutations.
-        if (numFilterFields > 0) {
-            int idx = inputSchema.findVariable(filterKeys.get(0));
-            fieldPermutation[i++] = idx;
-        }
 
+        // set the meta record permutation
         if (additionalNonFilterFields != null) {
             for (LogicalVariable var : additionalNonFilterFields) {
                 int idx = inputSchema.findVariable(var);
                 fieldPermutation[i++] = idx;
             }
         }
+
+        // set the filters' permutations.
+        if (numFilterFields > 0) {
+            int idx = inputSchema.findVariable(filterKeys.get(0));
+            fieldPermutation[i++] = idx;
+        }
+
         return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
                 context.getMissingWriterFactory());
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e66057a..87c7d87 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -464,9 +464,9 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
     public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType,
             ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties)
             throws AlgebricksException {
-        ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(this, recordType);
+        ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(this, recordType, metaType);
         IBinaryComparatorFactory[] filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(this,
-                recordType, mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
+                recordType, metaType, mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
         IResourceFactory resourceFactory;
         switch (index.getIndexType()) {
             case BTREE:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
index e4f8948..c82b86a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
@@ -22,6 +22,7 @@ package org.apache.asterix.metadata.entities;
 import java.io.DataOutput;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.OrderedListBuilder;
@@ -61,15 +62,18 @@ public class InternalDatasetDetails implements IDatasetDetails {
     private final List<List<String>> primaryKeys;
     private final List<IAType> primaryKeyTypes;
     private final boolean autogenerated;
+    private final Integer filterSourceIndicator;
     private final List<String> filterField;
     private final List<Integer> keySourceIndicators;
 
     public static final String FILTER_FIELD_NAME = "FilterField";
+    public static final String FILTER_SOURCE_INDICATOR_FIELD_NAME = "FilterSourceIndicator";
     public static final String KEY_FILD_SOURCE_INDICATOR_FIELD_NAME = "KeySourceIndicator";
 
     public InternalDatasetDetails(FileStructure fileStructure, PartitioningStrategy partitioningStrategy,
             List<List<String>> partitioningKey, List<List<String>> primaryKey, List<Integer> keyFieldIndicators,
-            List<IAType> primaryKeyType, boolean autogenerated, List<String> filterField) {
+            List<IAType> primaryKeyType, boolean autogenerated, Integer filterSourceIndicator,
+            List<String> filterField) {
         this.fileStructure = fileStructure;
         this.partitioningStrategy = partitioningStrategy;
         this.partitioningKeys = partitioningKey;
@@ -84,7 +88,14 @@ public class InternalDatasetDetails implements IDatasetDetails {
         this.keySourceIndicators = keyFieldIndicators;
         this.primaryKeyTypes = primaryKeyType;
         this.autogenerated = autogenerated;
-        this.filterField = filterField;
+        if (filterSourceIndicator != null) {
+            // to make sure filter source indicator and filter field is consistent
+            this.filterSourceIndicator = filterSourceIndicator;
+            this.filterField = Objects.requireNonNull(filterField);
+        } else {
+            this.filterSourceIndicator = null;
+            this.filterField = null;
+        }
     }
 
     public List<List<String>> getPartitioningKey() {
@@ -119,6 +130,10 @@ public class InternalDatasetDetails implements IDatasetDetails {
         return filterField;
     }
 
+    public Integer getFilterSourceIndicator() {
+        return filterSourceIndicator;
+    }
+
     @Override
     public DatasetType getDatasetType() {
         return DatasetType.INTERNAL;
@@ -209,10 +224,21 @@ public class InternalDatasetDetails implements IDatasetDetails {
                 fieldValue);
 
         // write filter fields if any
+        Integer filterSourceIndicator = getFilterSourceIndicator();
         List<String> filterField = getFilterField();
         if (filterField != null) {
-            listBuilder.reset(heterogeneousList);
             ArrayBackedValueStorage nameValue = new ArrayBackedValueStorage();
+            // write filter source indicator
+            nameValue.reset();
+            aString.setValue(FILTER_SOURCE_INDICATOR_FIELD_NAME);
+            stringSerde.serialize(aString, nameValue.getDataOutput());
+            fieldValue.reset();
+            aInt8.setValue(filterSourceIndicator.byteValue());
+            int8Serde.serialize(aInt8, fieldValue.getDataOutput());
+            internalRecordBuilder.addField(nameValue, fieldValue);
+
+            // write filter fields
+            listBuilder.reset(heterogeneousList);
             nameValue.reset();
             aString.setValue(FILTER_FIELD_NAME);
             stringSerde.serialize(aString, nameValue.getDataOutput());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index f0567a4..e4069c2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -151,11 +151,25 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
                         .getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_AUTOGENERATED_FIELD_INDEX))
                                 .getBoolean();
 
+                // check if there is a filter source indicator
+                Integer filterSourceIndicator = null;
+                int filterSourceIndicatorPos = datasetDetailsRecord.getType()
+                        .getFieldIndex(InternalDatasetDetails.FILTER_SOURCE_INDICATOR_FIELD_NAME);
+                if (filterSourceIndicatorPos >= 0) {
+                    filterSourceIndicator =
+                            (int) ((AInt8) datasetDetailsRecord.getValueByPos(filterSourceIndicatorPos)).getByteValue();
+                }
+
                 // Check if there is a filter field.
                 List<String> filterField = null;
                 int filterFieldPos =
                         datasetDetailsRecord.getType().getFieldIndex(InternalDatasetDetails.FILTER_FIELD_NAME);
                 if (filterFieldPos >= 0) {
+                    // backward compatibility, if a dataset contains filter field but no filter source indicator
+                    // we set the indicator to 0 by default.
+                    if (filterSourceIndicator == null) {
+                        filterSourceIndicator = 0;
+                    }
                     filterField = new ArrayList<>();
                     cursor = ((AOrderedList) datasetDetailsRecord.getValueByPos(filterFieldPos)).getCursor();
                     while (cursor.next()) {
@@ -180,7 +194,8 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
                 }
 
                 datasetDetails = new InternalDatasetDetails(fileStructure, partitioningStrategy, partitioningKey,
-                        partitioningKey, keyFieldSourceIndicator, partitioningKeyType, autogenerated, filterField);
+                        partitioningKey, keyFieldSourceIndicator, partitioningKeyType, autogenerated,
+                        filterSourceIndicator, filterField);
                 break;
             }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 3b312bb..762b9e5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -103,35 +103,43 @@ public class DatasetUtil {
     private DatasetUtil() {
     }
 
+    public static Integer getFilterSourceIndicator(Dataset dataset) {
+        return ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterSourceIndicator();
+    }
+
     public static List<String> getFilterField(Dataset dataset) {
         return ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField();
     }
 
     public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset,
-            ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+            ARecordType recordType, ARecordType metaType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
             throws AlgebricksException {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return null;
         }
+        Integer filterFieldSourceIndicator = getFilterSourceIndicator(dataset);
         List<String> filterField = getFilterField(dataset);
         if (filterField == null) {
             return null;
         }
         IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
+        ARecordType itemType = filterFieldSourceIndicator == 0 ? recordType : metaType;
         IAType type = itemType.getSubFieldType(filterField);
         bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, true);
         return bcfs;
     }
 
-    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType itemType)
+    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType recordType, ARecordType metaType)
             throws AlgebricksException {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return null;
         }
+        Integer filterFieldSourceIndicator = getFilterSourceIndicator(dataset);
         List<String> filterField = getFilterField(dataset);
         if (filterField == null) {
             return null;
         }
+        ARecordType itemType = filterFieldSourceIndicator == 0 ? recordType : metaType;
         ITypeTraits[] typeTraits = new ITypeTraits[1];
         IAType type = itemType.getSubFieldType(filterField);
         typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
@@ -151,7 +159,8 @@ public class DatasetUtil {
         int numKeys = partitioningKeys.size();
 
         int[] filterFields = new int[1];
-        filterFields[0] = numKeys + 1;
+        int valueFields = dataset.hasMetaPart() ? 2 : 1;
+        filterFields[0] = numKeys + valueFields;
         return filterFields;
     }
 
@@ -471,9 +480,13 @@ public class DatasetUtil {
         }
         // add the previous filter third
         int fieldIdx = -1;
+        Integer filterSourceIndicator = null;
+        ARecordType filterItemType = null;
         if (numFilterFields > 0) {
+            filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(dataset);
             String filterField = DatasetUtil.getFilterField(dataset).get(0);
-            String[] fieldNames = itemType.getFieldNames();
+            filterItemType = filterSourceIndicator == 0 ? itemType : metaItemType;
+            String[] fieldNames = filterItemType.getFieldNames();
             int i = 0;
             for (; i < fieldNames.length; i++) {
                 if (fieldNames[i].equals(filterField)) {
@@ -481,9 +494,10 @@ public class DatasetUtil {
                 }
             }
             fieldIdx = i;
-            outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputTypeTraits[f] =
+                    dataFormat.getTypeTraitProvider().getTypeTrait(filterItemType.getFieldTypes()[fieldIdx]);
             outputSerDes[f] =
-                    dataFormat.getSerdeProvider().getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+                    dataFormat.getSerdeProvider().getSerializerDeserializer(filterItemType.getFieldTypes()[fieldIdx]);
             f++;
         }
         for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
@@ -493,7 +507,8 @@ public class DatasetUtil {
         RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
         op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
                 missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
-                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, itemType, fieldIdx, hasSecondaries);
+                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType,
+                fieldIdx, hasSecondaries);
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index f47786d..0cff284 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -27,6 +27,7 @@ import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
@@ -279,7 +280,13 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations
             secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getDataFormat()
                     .getFieldAccessEvaluatorFactory(metadataProvider.getFunctionManager(), itemType, filterFieldName,
                             numPrimaryKeys, sourceLoc);
-            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            Pair<IAType, Boolean> keyTypePair;
+            // since filter is not null, it's safe to cast to internal
+            if (((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterSourceIndicator() == 0) {
+                keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            } else {
+                keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, metaType);
+            }
             IAType type = keyTypePair.first;
             ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
             secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index 3930563..01e1af2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -24,6 +24,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.operators.LSMSecondaryIndexBulkLoadOperatorDescriptor;
@@ -188,7 +189,13 @@ public class SecondaryCorrelatedBTreeOperationsHelper extends SecondaryCorrelate
             secondaryFieldAccessEvalFactories[numSecondaryKeys] =
                     metadataProvider.getDataFormat().getFieldAccessEvaluatorFactory(
                             metadataProvider.getFunctionManager(), itemType, filterFieldName, recordColumn, sourceLoc);
-            Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            Pair<IAType, Boolean> keyTypePair;
+            // since filter is not null, it's safe to cast to internal
+            if (((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterSourceIndicator() == 0) {
+                keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            } else {
+                keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, metaType);
+            }
             IAType type = keyTypePair.first;
             ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type);
             secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 6bcc039..fd45ff4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -235,7 +235,9 @@ public abstract class SecondaryIndexOperationsHelper {
             secondaryBTreeFields[i] = i;
         }
 
-        IAType type = itemType.getSubFieldType(filterFieldName);
+        IAType type = ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterSourceIndicator() == 0
+                ? itemType.getSubFieldType(filterFieldName, itemType)
+                : metaType.getSubFieldType(filterFieldName, metaType);
         filterCmpFactories[0] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true);
         filterTypeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
         secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys;
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
index 58f03af..73eeae4 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
@@ -50,7 +50,7 @@ public class DatasetTupleTranslatorTest {
                     Collections.singletonList(Collections.singletonList("row_id")),
                     Collections.singletonList(Collections.singletonList("row_id")),
                     indicator == null ? null : Collections.singletonList(indicator),
-                    Collections.singletonList(BuiltinType.AINT64), false, Collections.emptyList());
+                    Collections.singletonList(BuiltinType.AINT64), false, null, null);
 
             Dataset dataset = new Dataset(DataverseName.createSinglePartName("test"), "log",
                     DataverseName.createSinglePartName("foo"), "LogType", DataverseName.createSinglePartName("CB"),
diff --git a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
index cc02c49..5fcb6e0 100644
--- a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
+++ b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
@@ -61,7 +61,7 @@ public class IndexTupleTranslatorTest {
                     Collections.singletonList(Collections.singletonList("row_id")),
                     Collections.singletonList(Collections.singletonList("row_id")),
                     indicator == null ? null : Collections.singletonList(indicator),
-                    Collections.singletonList(BuiltinType.AINT64), false, Collections.emptyList());
+                    Collections.singletonList(BuiltinType.AINT64), false, null, null);
 
             DataverseName dvTest = DataverseName.createSinglePartName("test");
             DataverseName dvFoo = DataverseName.createSinglePartName("foo");
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
index 43f54f2..2adad12 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
@@ -37,7 +37,8 @@ public class LSMPrimaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOpera
 
     private static final long serialVersionUID = 1L;
     protected final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    protected final ARecordType recordType;
+    protected final Integer filterSourceIndicator;
+    protected final ARecordType filterItemType;
     protected final int filterIndex;
     protected ISearchOperationCallbackFactory searchOpCallbackFactory;
     protected final int numPrimaryKeys;
@@ -49,15 +50,16 @@ public class LSMPrimaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOpera
             IMissingWriterFactory missingWriterFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory,
             ISearchOperationCallbackFactory searchOpCallbackFactory,
-            IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, ARecordType recordType,
-            int filterIndex, boolean hasSecondaries) {
+            IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, Integer filterSourceIndicator,
+            ARecordType filterItemType, int filterIndex, boolean hasSecondaries) {
         super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, null, true,
                 modificationOpCallbackFactory);
         this.frameOpCallbackFactory = frameOpCallbackFactory;
         this.searchOpCallbackFactory = searchOpCallbackFactory;
         this.numPrimaryKeys = numPrimaryKeys;
         this.missingWriterFactory = missingWriterFactory;
-        this.recordType = recordType;
+        this.filterSourceIndicator = filterSourceIndicator;
+        this.filterItemType = filterItemType;
         this.filterIndex = filterIndex;
         this.hasSecondaries = hasSecondaries;
     }
@@ -67,7 +69,7 @@ public class LSMPrimaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOpera
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMPrimaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
-                intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, recordType, filterIndex,
-                frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
+                intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, filterSourceIndicator,
+                filterItemType, filterIndex, frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 316665d..f5bddc5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -96,7 +96,8 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     protected boolean isFiltered = false;
     private final ArrayTupleReference prevTupleWithFilter = new ArrayTupleReference();
     private ArrayTupleBuilder prevRecWithPKWithFilterValue;
-    private ARecordType recordType;
+    private Integer filterSourceIndicator = null;
+    private ARecordType filterItemType;
     private int presetFieldIndex = -1;
     private ARecordPointable recPointable;
     private DataOutput prevDos;
@@ -117,8 +118,8 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IModificationOperationCallbackFactory modCallbackFactory,
-            ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, ARecordType recordType,
-            int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, Integer filterSourceIndicator,
+            ARecordType filterItemType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
             IMissingWriterFactory missingWriterFactory, final boolean hasSecondaries) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
                 modCallbackFactory, null);
@@ -138,8 +139,9 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
         this.filterFieldIndex = numOfPrimaryKeys + (hasMeta ? 2 : 1);
         if (filterFieldIndex >= 0) {
             isFiltered = true;
-            this.recordType = recordType;
+            this.filterItemType = filterItemType;
             this.presetFieldIndex = filterFieldIndex;
+            this.filterSourceIndicator = filterSourceIndicator;
             this.recPointable = ARecordPointable.FACTORY.createPointable();
             this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
@@ -393,13 +395,20 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                 prevDos.write(prevTuple.getFieldData(i), prevTuple.getFieldStart(i), prevTuple.getFieldLength(i));
                 prevRecWithPKWithFilterValue.addFieldEndOffset();
             }
-            recPointable.set(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
-                    prevTuple.getFieldLength(numOfPrimaryKeys));
+
+            if (filterSourceIndicator == 0) {
+                recPointable.set(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
+                        prevTuple.getFieldLength(numOfPrimaryKeys));
+            } else {
+                recPointable.set(prevTuple.getFieldData(metaFieldIndex), prevTuple.getFieldStart(metaFieldIndex),
+                        prevTuple.getFieldLength(metaFieldIndex));
+            }
             // copy the field data from prevTuple
-            byte tag = recPointable.getClosedFieldType(recordType, presetFieldIndex).getTypeTag().serialize();
+            byte tag = recPointable.getClosedFieldType(filterItemType, presetFieldIndex).getTypeTag().serialize();
             prevDos.write(tag);
-            prevDos.write(recPointable.getByteArray(), recPointable.getClosedFieldOffset(recordType, presetFieldIndex),
-                    recPointable.getClosedFieldSize(recordType, presetFieldIndex));
+            prevDos.write(recPointable.getByteArray(),
+                    recPointable.getClosedFieldOffset(filterItemType, presetFieldIndex),
+                    recPointable.getClosedFieldSize(filterItemType, presetFieldIndex));
             prevRecWithPKWithFilterValue.addFieldEndOffset();
             // prepare the tuple
             prevTupleWithFilter.reset(prevRecWithPKWithFilterValue.getFieldEndOffsets(),