You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2023/10/30 22:24:52 UTC

Change in asterixdb[master]: [ASTERIXDB-3287][COMP] Introduce write operator

From Wail Alkowaileet <wa...@gmail.com>:

Wail Alkowaileet has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17891 )


Change subject: [ASTERIXDB-3287][COMP] Introduce write operator
......................................................................

[ASTERIXDB-3287][COMP] Introduce write operator

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Add the logical and physical write operators. We utilize
the old (and deprecated) operators.

Change-Id: Ib4fca256c6bdfa4b83890c285f509d476f130a54
---
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
A hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
A asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
A asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
A asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
21 files changed, 939 insertions(+), 555 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/91/17891/1

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index bf86cc5..29984e6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.optimizer.rules.CheckFilterExpressionTypeRule;
 import org.apache.asterix.optimizer.rules.CheckFullParallelSortRule;
 import org.apache.asterix.optimizer.rules.CheckInsertUpsertReturningRule;
+import org.apache.asterix.optimizer.rules.CleanupWriteOperatorRule;
 import org.apache.asterix.optimizer.rules.ConstantFoldingRule;
 import org.apache.asterix.optimizer.rules.CountVarToCountOneRule;
 import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule;
@@ -357,6 +358,7 @@
         // RemoveRedundantBooleanExpressionsInJoinRule has to run first to probably eliminate the need for
         // introducing an assign operator in ExtractSimilarVariablesInJoinRule
         planCleanupRules.add(new ExtractRedundantVariablesInJoinRule());
+        planCleanupRules.add(new CleanupWriteOperatorRule());
 
         // Needs to invoke ByNameToByIndexFieldAccessRule as the last logical optimization rule because
         // some rules can push a FieldAccessByName to a place where the name it tries to access is in the closed part.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
new file mode 100644
index 0000000..459a29b
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CleanupWriteOperatorRule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class CleanupWriteOperatorRule implements IAlgebraicRewriteRule {
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.WRITE) {
+            return false;
+        }
+
+        WriteOperator writeOp = (WriteOperator) op;
+        ILogicalExpression pathExpr = writeOp.getPathExpression().getValue();
+        if (pathExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+            return false;
+        }
+
+        boolean changed = false;
+        List<Mutable<ILogicalExpression>> partitionExprs = writeOp.getPartitionExpressions();
+        if (!partitionExprs.isEmpty()) {
+            // Useless partition expressions due to having a constant path expression
+            partitionExprs.clear();
+            writeOp.getOrderExpressions().clear();
+            changed = true;
+        }
+
+        return changed;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 256d481..343ff5d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -19,145 +19,20 @@
 
 package org.apache.asterix.optimizer.rules;
 
-import java.io.DataInputStream;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.WarningCollector;
-import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
-import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
-import org.apache.asterix.dataflow.data.nontagged.NullWriterFactory;
-import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
-import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFamilyProvider;
-import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.formats.nontagged.TypeTraitProvider;
-import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.om.base.ADouble;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.constants.AsterixConstantValue;
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AbstractCollectionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
-import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.optimizer.rules.visitor.ConstantFoldingVisitor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
-import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
-import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import org.apache.hyracks.util.LogRedactionUtil;
-
-import com.google.common.collect.ImmutableMap;
 
 public class ConstantFoldingRule implements IAlgebraicRewriteRule {
 
-    private final ConstantFoldingVisitor cfv = new ConstantFoldingVisitor();
-    private final JobGenContext jobGenCtx;
-
-    private static final Map<FunctionIdentifier, IAObject> FUNC_ID_TO_CONSTANT = ImmutableMap
-            .of(BuiltinFunctions.NUMERIC_E, new ADouble(Math.E), BuiltinFunctions.NUMERIC_PI, new ADouble(Math.PI));
-
-    /**
-     * Throws exceptions in substituteProducedVariable, setVarType, and one getVarType method.
-     */
-    private static final IVariableTypeEnvironment _emptyTypeEnv = new IVariableTypeEnvironment() {
-
-        @Override
-        public boolean substituteProducedVariable(LogicalVariable v1, LogicalVariable v2) {
-            throw new IllegalStateException();
-        }
-
-        @Override
-        public void setVarType(LogicalVariable var, Object type) {
-            throw new IllegalStateException();
-        }
-
-        @Override
-        public Object getVarType(LogicalVariable var, List<LogicalVariable> nonMissableVariables,
-                List<List<LogicalVariable>> correlatedMissableVariableLists, List<LogicalVariable> nonNullableVariables,
-                List<List<LogicalVariable>> correlatedNullableVariableLists) {
-            throw new IllegalStateException();
-        }
-
-        @Override
-        public Object getVarType(LogicalVariable var) {
-            throw new IllegalStateException();
-        }
-
-        @Override
-        public Object getType(ILogicalExpression expr) throws AlgebricksException {
-            return ExpressionTypeComputer.INSTANCE.getType(expr, null, this);
-        }
-    };
-
-    private static final IOperatorSchema[] _emptySchemas = new IOperatorSchema[] {};
+    private final ConstantFoldingVisitor cfv;
 
     public ConstantFoldingRule(ICcApplicationContext appCtx) {
-        MetadataProvider metadataProvider = MetadataProvider.createWithDefaultNamespace(appCtx);
-        jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, SerializerDeserializerProvider.INSTANCE,
-                BinaryHashFunctionFactoryProvider.INSTANCE, BinaryHashFunctionFamilyProvider.INSTANCE,
-                BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
-                BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
-                ResultSerializerFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
-                UnnestingPositionWriterFactory.INSTANCE, null,
-                new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
-                ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null,
-                NoOpWarningCollector.INSTANCE, 0, new PhysicalOptimizationConfig());
+        cfv = new ConstantFoldingVisitor(appCtx);
     }
 
     @Override
@@ -176,332 +51,4 @@
         cfv.reset(context);
         return op.acceptExpressionTransform(cfv);
     }
-
-    private class ConstantFoldingVisitor implements ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
-            ILogicalExpressionReferenceTransform, IEvaluatorContext {
-
-        private final IPointable p = VoidPointable.FACTORY.createPointable();
-        private final ByteBufferInputStream bbis = new ByteBufferInputStream();
-        private final DataInputStream dis = new DataInputStream(bbis);
-        private final WarningCollector warningCollector = new WarningCollector();
-        private IOptimizationContext optContext;
-        private IServiceContext serviceContext;
-
-        private void reset(IOptimizationContext context) {
-            optContext = context;
-            serviceContext =
-                    ((MetadataProvider) context.getMetadataProvider()).getApplicationContext().getServiceContext();
-        }
-
-        @Override
-        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
-            AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
-            Pair<Boolean, ILogicalExpression> newExpression = expr.accept(this, null);
-            if (newExpression.first) {
-                exprRef.setValue(newExpression.second);
-            }
-            return newExpression.first;
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> visitConstantExpression(ConstantExpression expr, Void arg) {
-            return new Pair<>(false, expr);
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> visitVariableReferenceExpression(VariableReferenceExpression expr,
-                Void arg) {
-            return new Pair<>(false, expr);
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
-                Void arg) throws AlgebricksException {
-            boolean changed = constantFoldArgs(expr, arg);
-            List<Mutable<ILogicalExpression>> argList = expr.getArguments();
-            int argConstantCount = countConstantArgs(argList);
-            FunctionIdentifier fid = expr.getFunctionIdentifier();
-            if (argConstantCount != argList.size()) {
-                if (argConstantCount > 0 && (BuiltinFunctions.OR.equals(fid) || BuiltinFunctions.AND.equals(fid))) {
-                    if (foldOrAndArgs(expr)) {
-                        ILogicalExpression changedExpr =
-                                expr.getArguments().size() == 1 ? expr.getArguments().get(0).getValue() : expr;
-                        return new Pair<>(true, changedExpr);
-                    }
-                }
-                return new Pair<>(changed, expr);
-            }
-
-            if (!expr.isFunctional() || !canConstantFold(expr)) {
-                return new Pair<>(changed, expr);
-            }
-
-            try {
-                if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
-                    IAType argType = (IAType) _emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
-                    if (argType.getTypeTag() == ATypeTag.OBJECT) {
-                        ARecordType rt = (ARecordType) argType;
-                        String str = ConstantExpressionUtil.getStringConstant(expr.getArguments().get(1).getValue());
-                        int k = rt.getFieldIndex(str);
-                        if (k >= 0) {
-                            // wait for the ByNameToByIndex rule to apply
-                            return new Pair<>(changed, expr);
-                        }
-                    }
-                }
-                IAObject c = FUNC_ID_TO_CONSTANT.get(fid);
-                if (c != null) {
-                    ConstantExpression constantExpression = new ConstantExpression(new AsterixConstantValue(c));
-                    constantExpression.setSourceLocation(expr.getSourceLocation());
-                    return new Pair<>(true, constantExpression);
-                }
-
-                IScalarEvaluatorFactory fact = jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
-                        _emptyTypeEnv, _emptySchemas, jobGenCtx);
-
-                warningCollector.clear();
-                IScalarEvaluator eval = fact.createScalarEvaluator(this);
-                eval.evaluate(null, p);
-                IAType returnType = (IAType) _emptyTypeEnv.getType(expr);
-                ATypeTag runtimeType = PointableHelper.getTypeTag(p);
-                if (runtimeType.isDerivedType()) {
-                    returnType = TypeComputeUtils.getActualType(returnType);
-                } else {
-                    returnType = TypeTagUtil.getBuiltinTypeByTag(runtimeType);
-                }
-                @SuppressWarnings("rawtypes")
-                ISerializerDeserializer serde =
-                        jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(returnType);
-                bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(), p.getStartOffset(), p.getLength()), 0);
-                IAObject o = (IAObject) serde.deserialize(dis);
-                warningCollector.getWarnings(optContext.getWarningCollector());
-                ConstantExpression constantExpression = new ConstantExpression(new AsterixConstantValue(o));
-                constantExpression.setSourceLocation(expr.getSourceLocation());
-                return new Pair<>(true, constantExpression);
-            } catch (HyracksDataException | AlgebricksException e) {
-                if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
-                    AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Exception caught at constant folding: " + e, e);
-                }
-                return new Pair<>(false, null);
-            }
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> visitAggregateFunctionCallExpression(
-                AggregateFunctionCallExpression expr, Void arg) throws AlgebricksException {
-            boolean changed = constantFoldArgs(expr, arg);
-            return new Pair<>(changed, expr);
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> visitStatefulFunctionCallExpression(
-                StatefulFunctionCallExpression expr, Void arg) throws AlgebricksException {
-            boolean changed = constantFoldArgs(expr, arg);
-            return new Pair<>(changed, expr);
-        }
-
-        @Override
-        public Pair<Boolean, ILogicalExpression> visitUnnestingFunctionCallExpression(
-                UnnestingFunctionCallExpression expr, Void arg) throws AlgebricksException {
-            boolean changed = constantFoldArgs(expr, arg);
-            return new Pair<>(changed, expr);
-        }
-
-        private boolean constantFoldArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
-            return expr.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)
-                    ? foldRecordArgs(expr, arg) : foldFunctionArgs(expr, arg);
-        }
-
-        private boolean foldFunctionArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
-            boolean changed = false;
-            for (Mutable<ILogicalExpression> exprArgRef : expr.getArguments()) {
-                changed |= foldArg(exprArgRef, arg);
-            }
-            return changed;
-        }
-
-        private boolean foldRecordArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
-            if (expr.getArguments().size() % 2 != 0) {
-                String functionName = expr.getFunctionIdentifier().getName();
-                throw CompilationException.create(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS, expr.getSourceLocation(),
-                        functionName);
-            }
-            boolean changed = false;
-            Iterator<Mutable<ILogicalExpression>> iterator = expr.getArguments().iterator();
-            int fieldNameIdx = 0;
-            while (iterator.hasNext()) {
-                Mutable<ILogicalExpression> fieldNameExprRef = iterator.next();
-                Pair<Boolean, ILogicalExpression> fieldNameExpr = fieldNameExprRef.getValue().accept(this, arg);
-                boolean isDuplicate = false;
-                if (fieldNameExpr.first) {
-                    String fieldName = ConstantExpressionUtil.getStringConstant(fieldNameExpr.second);
-                    if (fieldName != null) {
-                        isDuplicate = isDuplicateField(fieldName, fieldNameIdx, expr.getArguments());
-                    }
-                    if (isDuplicate) {
-                        IWarningCollector warningCollector = optContext.getWarningCollector();
-                        if (warningCollector.shouldWarn()) {
-                            warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
-                                    ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, LogRedactionUtil.userData(fieldName)));
-                        }
-                        iterator.remove();
-                        iterator.next();
-                        iterator.remove();
-                    } else {
-                        fieldNameExprRef.setValue(fieldNameExpr.second);
-                    }
-                    changed = true;
-                }
-                if (!isDuplicate) {
-                    Mutable<ILogicalExpression> fieldValue = iterator.next();
-                    changed |= foldArg(fieldValue, arg);
-                    fieldNameIdx += 2;
-                }
-            }
-            return changed;
-        }
-
-        private boolean isDuplicateField(String fName, int fIdx, List<Mutable<ILogicalExpression>> args) {
-            for (int i = 0, size = args.size(); i < size; i += 2) {
-                if (i != fIdx && fName.equals(ConstantExpressionUtil.getStringConstant(args.get(i).getValue()))) {
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        private boolean foldArg(Mutable<ILogicalExpression> exprArgRef, Void arg) throws AlgebricksException {
-            Pair<Boolean, ILogicalExpression> newExpr = exprArgRef.getValue().accept(this, arg);
-            if (newExpr.first) {
-                exprArgRef.setValue(newExpr.second);
-                return true;
-            }
-            return false;
-        }
-
-        private int countConstantArgs(List<Mutable<ILogicalExpression>> argList) {
-            int n = 0;
-            for (Mutable<ILogicalExpression> r : argList) {
-                if (r.getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT) {
-                    n++;
-                }
-            }
-            return n;
-        }
-
-        private boolean canConstantFold(ScalarFunctionCallExpression function) throws AlgebricksException {
-            // skip external functions because they're not available at compile time (on CC)
-            IFunctionInfo fi = function.getFunctionInfo();
-            if (fi.isExternal()) {
-                return false;
-            }
-            IAType returnType = (IAType) _emptyTypeEnv.getType(function);
-            // skip all functions that would produce records/arrays/multisets (derived types) in their open format
-            // this is because constant folding them will make them closed (currently)
-            if (function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
-                if (returnType.getTypeTag() != ATypeTag.OBJECT || ((ARecordType) returnType).isOpen()) {
-                    return false;
-                }
-            }
-            return canConstantFoldType(returnType);
-        }
-
-        private boolean canConstantFoldType(IAType returnType) {
-            ATypeTag tag = returnType.getTypeTag();
-            if (tag == ATypeTag.ANY) {
-                // if the function is to return a record (or derived data), that record would (should) be an open record
-                return false;
-            } else if (tag == ATypeTag.OBJECT) {
-                ARecordType recordType = (ARecordType) returnType;
-                if (recordType.isOpen()) {
-                    return false;
-                }
-                IAType[] fieldTypes = recordType.getFieldTypes();
-                for (int i = 0; i < fieldTypes.length; i++) {
-                    if (!canConstantFoldType(fieldTypes[i])) {
-                        return false;
-                    }
-                }
-            } else if (tag.isListType()) {
-                AbstractCollectionType listType = (AbstractCollectionType) returnType;
-                return canConstantFoldType(listType.getItemType());
-            } else if (tag == ATypeTag.UNION) {
-                return canConstantFoldType(((AUnionType) returnType).getActualType());
-            }
-            return true;
-        }
-
-        private boolean foldOrAndArgs(ScalarFunctionCallExpression expr) {
-            // or(true,x,y) -> true; or(false,x,y) -> or(x,y)
-            boolean changed = false;
-            List<Mutable<ILogicalExpression>> argList = expr.getArguments();
-            Iterator<Mutable<ILogicalExpression>> argIter = argList.iterator();
-            Mutable<ILogicalExpression> argFalse = null;
-            while (argIter.hasNext()) {
-                Mutable<ILogicalExpression> argExprRef = argIter.next();
-                ILogicalExpression argExpr = argExprRef.getValue();
-                if (argExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
-                    continue;
-                }
-
-                ConstantExpression cExpr = (ConstantExpression) argExpr;
-                IAlgebricksConstantValue cValue = cExpr.getValue();
-                FunctionIdentifier fid = expr.getFunctionIdentifier();
-
-                if (replaceAndReturn(cValue, fid)) {
-                    // or(true,x,y) -> true;
-                    // and(false, x, y) -> false
-                    argList.clear();
-                    argList.add(argExprRef);
-                    return true;
-                } else if (removeAndContinue(cValue, fid)) {
-                    // or(false, x, y) -> or(x, y)
-                    // and(true, x, y) -> and(x, y)
-                    // remove 'false' (or 'true') from arg list, but save the expression.
-                    argFalse = argExprRef;
-                    argIter.remove();
-                    changed = true;
-                }
-            }
-            if (argList.isEmpty() && argFalse != null) {
-                argList.add(argFalse);
-            }
-            return changed;
-        }
-
-        private boolean replaceAndReturn(IAlgebricksConstantValue cValue, FunctionIdentifier fid) {
-            if (BuiltinFunctions.OR.equals(fid)) {
-                return cValue.isTrue();
-            } else {
-                // BuiltinFunctions.AND
-                return cValue.isFalse();
-            }
-        }
-
-        private boolean removeAndContinue(IAlgebricksConstantValue cValue, FunctionIdentifier fid) {
-            if (BuiltinFunctions.OR.equals(fid)) {
-                return cValue.isFalse();
-            } else {
-                // BuiltinFunctions.AND
-                return cValue.isTrue();
-            }
-        }
-
-        // IEvaluatorContext
-
-        @Override
-        public IServiceContext getServiceContext() {
-            return serviceContext;
-        }
-
-        @Override
-        public IHyracksTaskContext getTaskContext() {
-            return null;
-        }
-
-        @Override
-        public IWarningCollector getWarningCollector() {
-            return warningCollector;
-        }
-    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
new file mode 100644
index 0000000..0feb6c5
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/ConstantFoldingVisitor.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.visitor;
+
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningCollector;
+import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
+import org.apache.asterix.dataflow.data.nontagged.NullWriterFactory;
+import org.apache.asterix.formats.nontagged.ADMPrinterFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFamilyProvider;
+import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.runtime.base.UnnestingPositionWriterFactory;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.util.LogRedactionUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+public class ConstantFoldingVisitor implements ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
+        ILogicalExpressionReferenceTransform, IEvaluatorContext {
+
+    /**
+     * Throws exceptions in substituteProducedVariable, setVarType, and one getVarType method.
+     */
+    private static final IVariableTypeEnvironment _emptyTypeEnv = new IVariableTypeEnvironment() {
+
+        @Override
+        public boolean substituteProducedVariable(LogicalVariable v1, LogicalVariable v2) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public void setVarType(LogicalVariable var, Object type) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public Object getVarType(LogicalVariable var, List<LogicalVariable> nonMissableVariables,
+                List<List<LogicalVariable>> correlatedMissableVariableLists, List<LogicalVariable> nonNullableVariables,
+                List<List<LogicalVariable>> correlatedNullableVariableLists) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public Object getVarType(LogicalVariable var) {
+            throw new IllegalStateException();
+        }
+
+        @Override
+        public Object getType(ILogicalExpression expr) throws AlgebricksException {
+            return ExpressionTypeComputer.INSTANCE.getType(expr, null, this);
+        }
+    };
+
+    private static final IOperatorSchema[] _emptySchemas = new IOperatorSchema[] {};
+    private static final Map<FunctionIdentifier, IAObject> FUNC_ID_TO_CONSTANT = ImmutableMap
+            .of(BuiltinFunctions.NUMERIC_E, new ADouble(Math.E), BuiltinFunctions.NUMERIC_PI, new ADouble(Math.PI));
+    private final JobGenContext jobGenCtx;
+    private final IPointable p = VoidPointable.FACTORY.createPointable();
+    private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+    private final DataInputStream dis = new DataInputStream(bbis);
+    private final WarningCollector warningCollector = new WarningCollector();
+    private IOptimizationContext optContext;
+    private IServiceContext serviceContext;
+
+    public ConstantFoldingVisitor(ICcApplicationContext appCtx) {
+        MetadataProvider metadataProvider = MetadataProvider.createWithDefaultNamespace(appCtx);
+        jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, SerializerDeserializerProvider.INSTANCE,
+                BinaryHashFunctionFactoryProvider.INSTANCE, BinaryHashFunctionFamilyProvider.INSTANCE,
+                BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY,
+                BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE,
+                ResultSerializerFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE,
+                UnnestingPositionWriterFactory.INSTANCE, null,
+                new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())),
+                ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null,
+                NoOpWarningCollector.INSTANCE, 0, new PhysicalOptimizationConfig());
+    }
+
+    public void reset(IOptimizationContext context) {
+        optContext = context;
+        serviceContext = ((MetadataProvider) context.getMetadataProvider()).getApplicationContext().getServiceContext();
+    }
+
+    @Override
+    public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+        AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
+        Pair<Boolean, ILogicalExpression> newExpression = expr.accept(this, null);
+        if (newExpression.first) {
+            exprRef.setValue(newExpression.second);
+        }
+        return newExpression.first;
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> visitConstantExpression(ConstantExpression expr, Void arg) {
+        return new Pair<>(false, expr);
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> visitVariableReferenceExpression(VariableReferenceExpression expr,
+            Void arg) {
+        return new Pair<>(false, expr);
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
+            Void arg) throws AlgebricksException {
+        boolean changed = constantFoldArgs(expr, arg);
+        List<Mutable<ILogicalExpression>> argList = expr.getArguments();
+        int argConstantCount = countConstantArgs(argList);
+        FunctionIdentifier fid = expr.getFunctionIdentifier();
+        if (argConstantCount != argList.size()) {
+            if (argConstantCount > 0 && (BuiltinFunctions.OR.equals(fid) || BuiltinFunctions.AND.equals(fid))) {
+                if (foldOrAndArgs(expr)) {
+                    ILogicalExpression changedExpr =
+                            expr.getArguments().size() == 1 ? expr.getArguments().get(0).getValue() : expr;
+                    return new Pair<>(true, changedExpr);
+                }
+            }
+            return new Pair<>(changed, expr);
+        }
+
+        if (!expr.isFunctional() || !canConstantFold(expr)) {
+            return new Pair<>(changed, expr);
+        }
+
+        try {
+            if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
+                IAType argType = (IAType) _emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
+                if (argType.getTypeTag() == ATypeTag.OBJECT) {
+                    ARecordType rt = (ARecordType) argType;
+                    String str = ConstantExpressionUtil.getStringConstant(expr.getArguments().get(1).getValue());
+                    int k = rt.getFieldIndex(str);
+                    if (k >= 0) {
+                        // wait for the ByNameToByIndex rule to apply
+                        return new Pair<>(changed, expr);
+                    }
+                }
+            }
+            IAObject c = FUNC_ID_TO_CONSTANT.get(fid);
+            if (c != null) {
+                ConstantExpression constantExpression = new ConstantExpression(new AsterixConstantValue(c));
+                constantExpression.setSourceLocation(expr.getSourceLocation());
+                return new Pair<>(true, constantExpression);
+            }
+
+            IScalarEvaluatorFactory fact = jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
+                    _emptyTypeEnv, _emptySchemas, jobGenCtx);
+
+            warningCollector.clear();
+            IScalarEvaluator eval = fact.createScalarEvaluator(this);
+            eval.evaluate(null, p);
+            IAType returnType = (IAType) _emptyTypeEnv.getType(expr);
+            ATypeTag runtimeType = PointableHelper.getTypeTag(p);
+            if (runtimeType.isDerivedType()) {
+                returnType = TypeComputeUtils.getActualType(returnType);
+            } else {
+                returnType = TypeTagUtil.getBuiltinTypeByTag(runtimeType);
+            }
+            @SuppressWarnings("rawtypes")
+            ISerializerDeserializer serde =
+                    jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(returnType);
+            bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(), p.getStartOffset(), p.getLength()), 0);
+            IAObject o = (IAObject) serde.deserialize(dis);
+            warningCollector.getWarnings(optContext.getWarningCollector());
+            ConstantExpression constantExpression = new ConstantExpression(new AsterixConstantValue(o));
+            constantExpression.setSourceLocation(expr.getSourceLocation());
+            return new Pair<>(true, constantExpression);
+        } catch (HyracksDataException | AlgebricksException e) {
+            if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
+                AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Exception caught at constant folding: " + e, e);
+            }
+            return new Pair<>(false, null);
+        }
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr,
+            Void arg) throws AlgebricksException {
+        boolean changed = constantFoldArgs(expr, arg);
+        return new Pair<>(changed, expr);
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr,
+            Void arg) throws AlgebricksException {
+        boolean changed = constantFoldArgs(expr, arg);
+        return new Pair<>(changed, expr);
+    }
+
+    @Override
+    public Pair<Boolean, ILogicalExpression> visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr,
+            Void arg) throws AlgebricksException {
+        boolean changed = constantFoldArgs(expr, arg);
+        return new Pair<>(changed, expr);
+    }
+
+    private boolean constantFoldArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
+        return expr.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) ? foldRecordArgs(expr, arg)
+                : foldFunctionArgs(expr, arg);
+    }
+
+    private boolean foldFunctionArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
+        boolean changed = false;
+        for (Mutable<ILogicalExpression> exprArgRef : expr.getArguments()) {
+            changed |= foldArg(exprArgRef, arg);
+        }
+        return changed;
+    }
+
+    private boolean foldRecordArgs(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
+        if (expr.getArguments().size() % 2 != 0) {
+            String functionName = expr.getFunctionIdentifier().getName();
+            throw CompilationException.create(ErrorCode.COMPILATION_INVALID_NUM_OF_ARGS, expr.getSourceLocation(),
+                    functionName);
+        }
+        boolean changed = false;
+        Iterator<Mutable<ILogicalExpression>> iterator = expr.getArguments().iterator();
+        int fieldNameIdx = 0;
+        while (iterator.hasNext()) {
+            Mutable<ILogicalExpression> fieldNameExprRef = iterator.next();
+            Pair<Boolean, ILogicalExpression> fieldNameExpr = fieldNameExprRef.getValue().accept(this, arg);
+            boolean isDuplicate = false;
+            if (fieldNameExpr.first) {
+                String fieldName = ConstantExpressionUtil.getStringConstant(fieldNameExpr.second);
+                if (fieldName != null) {
+                    isDuplicate = isDuplicateField(fieldName, fieldNameIdx, expr.getArguments());
+                }
+                if (isDuplicate) {
+                    IWarningCollector warningCollector = optContext.getWarningCollector();
+                    if (warningCollector.shouldWarn()) {
+                        warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
+                                ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, LogRedactionUtil.userData(fieldName)));
+                    }
+                    iterator.remove();
+                    iterator.next();
+                    iterator.remove();
+                } else {
+                    fieldNameExprRef.setValue(fieldNameExpr.second);
+                }
+                changed = true;
+            }
+            if (!isDuplicate) {
+                Mutable<ILogicalExpression> fieldValue = iterator.next();
+                changed |= foldArg(fieldValue, arg);
+                fieldNameIdx += 2;
+            }
+        }
+        return changed;
+    }
+
+    private boolean isDuplicateField(String fName, int fIdx, List<Mutable<ILogicalExpression>> args) {
+        for (int i = 0, size = args.size(); i < size; i += 2) {
+            if (i != fIdx && fName.equals(ConstantExpressionUtil.getStringConstant(args.get(i).getValue()))) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean foldArg(Mutable<ILogicalExpression> exprArgRef, Void arg) throws AlgebricksException {
+        Pair<Boolean, ILogicalExpression> newExpr = exprArgRef.getValue().accept(this, arg);
+        if (newExpr.first) {
+            exprArgRef.setValue(newExpr.second);
+            return true;
+        }
+        return false;
+    }
+
+    private int countConstantArgs(List<Mutable<ILogicalExpression>> argList) {
+        int n = 0;
+        for (Mutable<ILogicalExpression> r : argList) {
+            if (r.getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+                n++;
+            }
+        }
+        return n;
+    }
+
+    private boolean canConstantFold(ScalarFunctionCallExpression function) throws AlgebricksException {
+        // skip external functions because they're not available at compile time (on CC)
+        IFunctionInfo fi = function.getFunctionInfo();
+        if (fi.isExternal()) {
+            return false;
+        }
+        IAType returnType = (IAType) _emptyTypeEnv.getType(function);
+        // skip all functions that would produce records/arrays/multisets (derived types) in their open format
+        // this is because constant folding them will make them closed (currently)
+        if (function.getFunctionIdentifier().equals(BuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
+            if (returnType.getTypeTag() != ATypeTag.OBJECT || ((ARecordType) returnType).isOpen()) {
+                return false;
+            }
+        }
+        return canConstantFoldType(returnType);
+    }
+
+    private boolean canConstantFoldType(IAType returnType) {
+        ATypeTag tag = returnType.getTypeTag();
+        if (tag == ATypeTag.ANY) {
+            // if the function is to return a record (or derived data), that record would (should) be an open record
+            return false;
+        } else if (tag == ATypeTag.OBJECT) {
+            ARecordType recordType = (ARecordType) returnType;
+            if (recordType.isOpen()) {
+                return false;
+            }
+            IAType[] fieldTypes = recordType.getFieldTypes();
+            for (int i = 0; i < fieldTypes.length; i++) {
+                if (!canConstantFoldType(fieldTypes[i])) {
+                    return false;
+                }
+            }
+        } else if (tag.isListType()) {
+            AbstractCollectionType listType = (AbstractCollectionType) returnType;
+            return canConstantFoldType(listType.getItemType());
+        } else if (tag == ATypeTag.UNION) {
+            return canConstantFoldType(((AUnionType) returnType).getActualType());
+        }
+        return true;
+    }
+
+    private boolean foldOrAndArgs(ScalarFunctionCallExpression expr) {
+        // or(true,x,y) -> true; or(false,x,y) -> or(x,y)
+        boolean changed = false;
+        List<Mutable<ILogicalExpression>> argList = expr.getArguments();
+        Iterator<Mutable<ILogicalExpression>> argIter = argList.iterator();
+        Mutable<ILogicalExpression> argFalse = null;
+        while (argIter.hasNext()) {
+            Mutable<ILogicalExpression> argExprRef = argIter.next();
+            ILogicalExpression argExpr = argExprRef.getValue();
+            if (argExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                continue;
+            }
+
+            ConstantExpression cExpr = (ConstantExpression) argExpr;
+            IAlgebricksConstantValue cValue = cExpr.getValue();
+            FunctionIdentifier fid = expr.getFunctionIdentifier();
+
+            if (replaceAndReturn(cValue, fid)) {
+                // or(true,x,y) -> true;
+                // and(false, x, y) -> false
+                argList.clear();
+                argList.add(argExprRef);
+                return true;
+            } else if (removeAndContinue(cValue, fid)) {
+                // or(false, x, y) -> or(x, y)
+                // and(true, x, y) -> and(x, y)
+                // remove 'false' (or 'true') from arg list, but save the expression.
+                argFalse = argExprRef;
+                argIter.remove();
+                changed = true;
+            }
+        }
+        if (argList.isEmpty() && argFalse != null) {
+            argList.add(argFalse);
+        }
+        return changed;
+    }
+
+    private boolean replaceAndReturn(IAlgebricksConstantValue cValue, FunctionIdentifier fid) {
+        if (BuiltinFunctions.OR.equals(fid)) {
+            return cValue.isTrue();
+        } else {
+            // BuiltinFunctions.AND
+            return cValue.isFalse();
+        }
+    }
+
+    private boolean removeAndContinue(IAlgebricksConstantValue cValue, FunctionIdentifier fid) {
+        if (BuiltinFunctions.OR.equals(fid)) {
+            return cValue.isFalse();
+        } else {
+            // BuiltinFunctions.AND
+            return cValue.isTrue();
+        }
+    }
+
+    // IEvaluatorContext
+
+    @Override
+    public IServiceContext getServiceContext() {
+        return serviceContext;
+    }
+
+    @Override
+    public IHyracksTaskContext getTaskContext() {
+        return null;
+    }
+
+    @Override
+    public IWarningCollector getWarningCollector() {
+        return warningCollector;
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index fc63d89..28bce7e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -22,7 +22,6 @@
 import static org.apache.asterix.common.metadata.MetadataConstants.METADATA_OBJECT_NAME_INVALID_CHARS;
 import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -108,6 +107,7 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Quadruple;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -123,6 +123,7 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -134,8 +135,8 @@
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
@@ -740,19 +741,14 @@
     }
 
     @Override
-    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
-            RecordDescriptor inputDesc) {
-        FileSplitDataSink fsds = (FileSplitDataSink) sink;
-        FileSplitSinkId fssi = fsds.getId();
-        FileSplit fs = fssi.getFileSplit();
-        File outFile = new File(fs.getPath());
-        String nodeId = fs.getNodeName();
+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(int sourceColumn,
+            int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+            IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression staticPathExpr,
+            SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
+            throws AlgebricksException {
 
-        SinkWriterRuntimeFactory runtime =
-                new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, writerFactory, inputDesc);
-        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
-        return new Pair<>(runtime, apc);
+        // TODO implement
+        throw new NotImplementedException();
     }
 
     @Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
new file mode 100644
index 0000000..753ac54
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
+
+public class WriteDataSink implements IWriteDataSink {
+    private final String adapterName;
+    private final Map<String, String> configuration;
+
+    public WriteDataSink(String adapterName, Map<String, String> configuration) {
+        this.adapterName = adapterName;
+        this.configuration = configuration;
+    }
+
+    private WriteDataSink(WriteDataSink writeDataSink) {
+        this.adapterName = writeDataSink.getAdapterName();
+        this.configuration = new HashMap<>(writeDataSink.configuration);
+    }
+
+    @Override
+    public final String getAdapterName() {
+        return adapterName;
+    }
+
+    @Override
+    public final Map<String, String> getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public IWriteDataSink createCopy() {
+        return new WriteDataSink(this);
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
index cc05183..72f659d 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/ConstantExpressionUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.om.utils;
 
 import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADouble;
 import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.AInt64;
 import org.apache.asterix.om.base.AOrderedList;
@@ -32,6 +33,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class ConstantExpressionUtil {
 
@@ -115,4 +117,22 @@
         return expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
                 ? getStringArgument((AbstractFunctionCallExpression) expr, index) : null;
     }
+
+    public static ConstantExpression create(String value, SourceLocation sourceLocation) {
+        return createExpression(new AString(value), sourceLocation);
+    }
+
+    public static ConstantExpression create(long value, SourceLocation sourceLocation) {
+        return createExpression(new AInt64(value), sourceLocation);
+    }
+
+    public static ConstantExpression create(double value, SourceLocation sourceLocation) {
+        return createExpression(new ADouble(value), sourceLocation);
+    }
+
+    private static ConstantExpression createExpression(IAObject value, SourceLocation sourceLocation) {
+        ConstantExpression constExpr = new ConstantExpression(new AsterixConstantValue(value));
+        constExpr.setSourceLocation(sourceLocation);
+        return constExpr;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4f3d8e4..2072dee 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -36,8 +36,11 @@
 import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -56,8 +59,10 @@
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
             IProjectionFiltrationInfo projectionFiltrationInfo) throws AlgebricksException;
 
-    Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, int[] printColumns,
-            IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, RecordDescriptor inputDesc)
+    Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(int sourceColumn,
+            int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories,
+            IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression staticPathExpr,
+            SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
             throws AlgebricksException;
 
     Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, int[] printColumns,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
new file mode 100644
index 0000000..fa7a55e
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IWriteDataSink.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.metadata;
+
+import java.util.Map;
+
+public interface IWriteDataSink {
+    String getAdapterName();
+
+    Map<String, String> getConfiguration();
+
+    IWriteDataSink createCopy();
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
index 61b7796..7eef90e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
@@ -23,31 +23,77 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class WriteOperator extends AbstractLogicalOperator {
-    private List<Mutable<ILogicalExpression>> expressions;
-    private IDataSink dataSink;
+    private final Mutable<ILogicalExpression> sourceExpression;
+    private final Mutable<ILogicalExpression> pathExpression;
+    private final List<Mutable<ILogicalExpression>> partitionExpressions;
+    private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+    private final IWriteDataSink writeDataSink;
 
-    public WriteOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink) {
-        this.expressions = expressions;
-        this.dataSink = dataSink;
+    public WriteOperator(Mutable<ILogicalExpression> sourceExpression, Mutable<ILogicalExpression> pathExpression,
+            List<Mutable<ILogicalExpression>> partitionExpressions,
+            List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
+            IWriteDataSink writeDataSink) {
+        this.sourceExpression = sourceExpression;
+        this.pathExpression = pathExpression;
+        this.partitionExpressions = partitionExpressions;
+        this.orderExpressions = orderExpressions;
+        this.writeDataSink = writeDataSink;
     }
 
-    public List<Mutable<ILogicalExpression>> getExpressions() {
-        return expressions;
+    public Mutable<ILogicalExpression> getSourceExpression() {
+        return sourceExpression;
     }
 
-    public IDataSink getDataSink() {
-        return dataSink;
+    public LogicalVariable getSourceVariable() {
+        return VariableUtilities.getVariable(sourceExpression.getValue());
+    }
+
+    public Mutable<ILogicalExpression> getPathExpression() {
+        return pathExpression;
+    }
+
+    public List<Mutable<ILogicalExpression>> getPartitionExpressions() {
+        return partitionExpressions;
+    }
+
+    public List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> getOrderExpressions() {
+        return orderExpressions;
+    }
+
+    public List<LogicalVariable> getPartitionVariables() {
+        List<LogicalVariable> partitionVariables = new ArrayList<>();
+        for (Mutable<ILogicalExpression> partitionExpression : partitionExpressions) {
+            partitionVariables.add(VariableUtilities.getVariable(partitionExpression.getValue()));
+        }
+        return partitionVariables;
+    }
+
+    public List<OrderColumn> getOrderColumns() {
+        List<OrderColumn> orderColumns = new ArrayList<>();
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> orderExpressionPair : orderExpressions) {
+            LogicalVariable variable = VariableUtilities.getVariable(orderExpressionPair.getSecond().getValue());
+            OrderOperator.IOrder.OrderKind kind = orderExpressionPair.first.getKind();
+            orderColumns.add(new OrderColumn(variable, kind));
+        }
+        return orderColumns;
+    }
+
+    public IWriteDataSink getWriteDataSink() {
+        return writeDataSink;
     }
 
     @Override
@@ -62,35 +108,37 @@
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
-        boolean modif = false;
-        for (int i = 0; i < expressions.size(); i++) {
-            boolean b = visitor.transform(expressions.get(i));
-            if (b) {
-                modif = true;
-            }
+        boolean changed = visitor.transform(sourceExpression);
+        changed |= visitor.transform(pathExpression);
+
+        for (Mutable<ILogicalExpression> expression : partitionExpressions) {
+            changed |= visitor.transform(expression);
         }
-        return modif;
+
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> orderExpressionPair : orderExpressions) {
+            changed |= visitor.transform(orderExpressionPair.second);
+        }
+
+        return changed;
     }
 
     @Override
     public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
+        return VariablePropagationPolicy.NONE;
     }
 
     @Override
     public boolean isMap() {
-        return false; // actually depends on the physical op.
+        return true;
     }
 
     @Override
     public void recomputeSchema() {
-        schema = new ArrayList<LogicalVariable>();
-        schema.addAll(inputs.get(0).getValue().getSchema());
+        schema = new ArrayList<>(inputs.get(0).getValue().getSchema());
     }
 
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 960e399..b3828df 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -289,9 +290,15 @@
 
     @Override
     public ILogicalOperator visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
-        deepCopyExpressionRefs(newExpressions, op.getExpressions());
-        return new WriteOperator(newExpressions, op.getDataSink());
+        Mutable<ILogicalExpression> newSourceExpression = deepCopyExpressionRef(op.getSourceExpression());
+        Mutable<ILogicalExpression> newPathExpression = deepCopyExpressionRef(op.getPathExpression());
+        List<Mutable<ILogicalExpression>> newPartitionExpressions =
+                deepCopyExpressionRefs(new ArrayList<>(), op.getPartitionExpressions());
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderPairExpressions =
+                deepCopyOrderAndExpression(op.getOrderExpressions());
+        IWriteDataSink writeDataSink = op.getWriteDataSink().createCopy();
+        return new WriteOperator(newSourceExpression, newPathExpression, newPartitionExpressions,
+                newOrderPairExpressions, writeDataSink);
     }
 
     @Override
@@ -369,11 +376,12 @@
         return new SinkOperator();
     }
 
-    private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
+    private List<Mutable<ILogicalExpression>> deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
             List<Mutable<ILogicalExpression>> oldExprs) {
         for (Mutable<ILogicalExpression> oldExpr : oldExprs) {
             newExprs.add(new MutableObject<>(oldExpr.getValue().cloneExpression()));
         }
+        return newExprs;
     }
 
     private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExprRef) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 74de6f5..4067b62 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -254,7 +254,7 @@
 
     @Override
     public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
-        standardLayout(op);
+        // Write is akin to project empty
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index faf3c11..7ceb812 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -380,7 +380,12 @@
     @Override
     public Void visitWriteOperator(WriteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        substUsedVariablesInExpr(op.getExpressions(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getSourceExpression(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getPathExpression(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getPartitionExpressions(), pair.first, pair.second);
+        for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : op.getOrderExpressions()) {
+            substUsedVariablesInExpr(orderExpr.second, pair.first, pair.second);
+        }
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index d7b6228..d7b2555 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -353,9 +353,15 @@
 
     @Override
     public Void visitWriteOperator(WriteOperator op, Void arg) {
-        for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+        op.getSourceExpression().getValue().getUsedVariables(usedVariables);
+        op.getPathExpression().getValue().getUsedVariables(usedVariables);
+        for (Mutable<ILogicalExpression> expr : op.getPartitionExpressions()) {
             expr.getValue().getUsedVariables(usedVariables);
         }
+
+        for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : op.getOrderExpressions()) {
+            orderExpr.second.getValue().getUsedVariables(usedVariables);
+        }
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
index fcc8c8e..8ff605d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractWindowPOperator.java
@@ -92,25 +92,8 @@
                 throw new IllegalStateException(op.getExecutionMode().name());
         }
 
-        // require local order property [pc1, ... pcN, oc1, ... ocN]
-        // accounting for cases where there's an overlap between order and partition columns
-        // TODO replace with required local grouping on partition columns + local order on order columns
-        List<OrderColumn> lopColumns = new ArrayList<>();
-        ListSet<LogicalVariable> pcVars = new ListSet<>();
-        pcVars.addAll(partitionColumns);
-        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
-            OrderColumn oc = orderColumns.get(oIdx);
-            LogicalVariable ocVar = oc.getColumn();
-            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
-                throw AlgebricksException.create(ErrorCode.UNSUPPORTED_WINDOW_SPEC, op.getSourceLocation(),
-                        String.valueOf(partitionColumns), String.valueOf(orderColumns));
-            }
-            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
-        }
-        int pIdx = 0;
-        for (LogicalVariable pColumn : pcVars) {
-            lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
-        }
+        List<OrderColumn> lopColumns =
+                getOrderRequirement(op, ErrorCode.UNSUPPORTED_WINDOW_SPEC, partitionColumns, orderColumns);
         List<ILocalStructuralProperty> localProps =
                 lopColumns.isEmpty() ? null : Collections.singletonList(new LocalOrderProperty(lopColumns));
 
@@ -295,4 +278,30 @@
         }
         return false;
     }
+
+    static List<OrderColumn> getOrderRequirement(ILogicalOperator op, ErrorCode errorCode,
+            List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) throws AlgebricksException {
+        // require local order property [pc1, ... pcN, oc1, ... ocN]
+        // accounting for cases where there's an overlap between order and partition columns
+        // TODO replace with required local grouping on partition columns + local order on order columns
+        List<OrderColumn> lopColumns = new ArrayList<>();
+        ListSet<LogicalVariable> pcVars = new ListSet<>();
+        pcVars.addAll(partitionColumns);
+        for (int oIdx = 0, ln = orderColumns.size(); oIdx < ln; oIdx++) {
+            OrderColumn oc = orderColumns.get(oIdx);
+            LogicalVariable ocVar = oc.getColumn();
+            if (!pcVars.remove(ocVar) && containsAny(orderColumns, oIdx + 1, pcVars)) {
+                throw AlgebricksException.create(errorCode, op.getSourceLocation(), String.valueOf(partitionColumns),
+                        String.valueOf(orderColumns));
+            }
+            lopColumns.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+        }
+        int pIdx = 0;
+        for (LogicalVariable pColumn : pcVars) {
+            lopColumns.add(pIdx++, new OrderColumn(pColumn, OrderOperator.IOrder.OrderKind.ASC));
+        }
+
+        return lopColumns;
+    }
+
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 07c798f..d462cd5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import org.apache.commons.lang3.mutable.Mutable;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractWindowPOperator.getOrderRequirement;
+
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -30,23 +34,40 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 
 public class SinkWritePOperator extends AbstractPhysicalOperator {
+    private final LogicalVariable sourceVariable;
+    private final List<LogicalVariable> partitionVariables;
+    private final List<OrderColumn> orderColumns;
+
+    public SinkWritePOperator(LogicalVariable sourceVariable, List<LogicalVariable> partitionVariables,
+            List<OrderColumn> orderColumns) {
+        this.sourceVariable = sourceVariable;
+        this.partitionVariables = partitionVariables;
+        this.orderColumns = orderColumns;
+    }
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -66,12 +87,34 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
-        WriteOperator write = (WriteOperator) op;
-        IDataSink sink = write.getDataSink();
-        IPartitioningProperty pp = sink.getPartitioningProperty();
-        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
-        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            IPhysicalPropertiesVector reqByParent, IOptimizationContext context) throws AlgebricksException {
+        if (partitionVariables.isEmpty()) {
+            return emptyUnaryRequirements();
+        }
+        IPartitioningProperty pp;
+        switch (op.getExecutionMode()) {
+            case PARTITIONED:
+                pp = UnorderedPartitionedProperty.of(new ListSet<>(partitionVariables),
+                        context.getComputationNodeDomain());
+                break;
+            case UNPARTITIONED:
+                pp = IPartitioningProperty.UNPARTITIONED;
+                break;
+            case LOCAL:
+                pp = null;
+                break;
+            default:
+                throw new IllegalStateException(op.getExecutionMode().name());
+        }
+
+        List<OrderColumn> finalOrderColumns =
+                getOrderRequirement(op, ErrorCode.UNSUPPORTED_WRITE_SPEC, partitionVariables, orderColumns);
+
+        List<ILocalStructuralProperty> localProps =
+                Collections.singletonList(new LocalOrderProperty(finalOrderColumns));
+        return new PhysicalRequirements(
+                new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, localProps) },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
     @Override
@@ -79,29 +122,39 @@
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         WriteOperator write = (WriteOperator) op;
-        int[] columns = new int[write.getExpressions().size()];
-        int i = 0;
-        for (Mutable<ILogicalExpression> exprRef : write.getExpressions()) {
-            ILogicalExpression expr = exprRef.getValue();
-            if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                throw new NotImplementedException("Only writing variable expressions is supported.");
-            }
-            VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
-            LogicalVariable v = varRef.getVariableReference();
-            columns[i++] = inputSchemas[0].findVariable(v);
+        IExpressionRuntimeProvider runtimeProvider = context.getExpressionRuntimeProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        IOperatorSchema schema = inputSchemas[0];
+        IWriteDataSink writeDataSink = write.getWriteDataSink();
+
+        // Source evaluator column
+        int sourceColumn = schema.findVariable(sourceVariable);
+
+        // Path expression
+        IScalarEvaluatorFactory dynamicPathEvalFactory = null;
+        ILogicalExpression staticPathExpr = null;
+        ILogicalExpression pathExpr = write.getPathExpression().getValue();
+        if (pathExpr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+            dynamicPathEvalFactory = runtimeProvider.createEvaluatorFactory(pathExpr, typeEnv, inputSchemas, context);
+        } else {
+            staticPathExpr = pathExpr;
         }
+
+        // Partition columns
+        int[] partitionColumns = JobGenHelper.projectVariables(schema, partitionVariables);
+        IBinaryComparatorFactory[] partitionComparatorFactories =
+                JobGenHelper.variablesToAscBinaryComparatorFactories(partitionVariables, typeEnv, context);
+
         RecordDescriptor recDesc =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
 
-        IPrinterFactory[] pf =
-                JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op), context, columns);
-
         IMetadataProvider<?, ?> mp = context.getMetadataProvider();
 
-        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints =
-                mp.getWriteFileRuntime(write.getDataSink(), columns, pf, context.getWriterFactory(), inputDesc);
+        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getWriteFileRuntime(
+                sourceColumn, partitionColumns, partitionComparatorFactories, dynamicPathEvalFactory, staticPathExpr,
+                pathExpr.getSourceLocation(), writeDataSink, inputDesc, typeEnv.getVarType(sourceVariable));
         IPushRuntimeFactory runtime = runtimeAndConstraints.first;
         runtime.setSourceLocation(write.getSourceLocation());
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index f49b6d4..e1e8c50 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -266,8 +266,22 @@
 
     @Override
     public Void visitWriteOperator(WriteOperator op, Integer indent) throws AlgebricksException {
-        addIndent(indent).append("write ");
-        pprintExprList(op.getExpressions(), indent);
+        AlgebricksStringBuilderWriter writer = addIndent(indent);
+        writer.append("write (");
+        writer.append(op.getSourceExpression().getValue().accept(exprVisitor, indent));
+        writer.append(") to path [");
+        writer.append(op.getPathExpression().getValue().accept(exprVisitor, indent));
+        writer.append("] ");
+        List<Mutable<ILogicalExpression>> partitionExpressions = op.getPartitionExpressions();
+        if (!partitionExpressions.isEmpty()) {
+            writer.append(" partition ");
+            pprintExprList(op.getPartitionExpressions(), indent);
+            List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions = op.getOrderExpressions();
+            if (!orderExpressions.isEmpty()) {
+                writer.append(" order ");
+                pprintOrderList(orderExpressions, indent);
+            }
+        }
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 6f65a9d..a60308a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -482,9 +482,20 @@
     public Void visitWriteOperator(WriteOperator op, Void indent) throws AlgebricksException {
         try {
             jsonGenerator.writeStringField(OPERATOR_FIELD, "write");
-            List<Mutable<ILogicalExpression>> expressions = op.getExpressions();
-            if (!expressions.isEmpty()) {
-                writeArrayFieldOfExpressions(EXPRESSIONS_FIELD, expressions, indent);
+
+            writeStringFieldExpression("value", op.getSourceExpression(), indent);
+            writeStringFieldExpression("path", op.getPathExpression(), indent);
+
+            List<Mutable<ILogicalExpression>> partitionExpressions = op.getPartitionExpressions();
+            if (!partitionExpressions.isEmpty()) {
+                writeObjectFieldWithExpressions("partition-by", partitionExpressions, indent);
+
+                List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions =
+                        op.getOrderExpressions();
+                if (!orderExpressions.isEmpty()) {
+                    writeArrayFieldOfOrderExprList("order-by", orderExpressions, indent);
+                }
+
             }
             return null;
         } catch (IOException e) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 1cea0a9..1f36aa5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -230,8 +230,23 @@
     @Override
     public String visitWriteOperator(WriteOperator op, Boolean showDetails) {
         stringBuilder.setLength(0);
-        stringBuilder.append("write ");
-        printExprList(op.getExpressions());
+        stringBuilder.append("write (");
+        stringBuilder.append(op.getSourceExpression());
+        stringBuilder.append(") to [");
+        stringBuilder.append(op.getPathExpression());
+        stringBuilder.append(']');
+        List<Mutable<ILogicalExpression>> partitionExpressions = op.getPartitionExpressions();
+        if (!partitionExpressions.isEmpty()) {
+            stringBuilder.append(" partition by ");
+            printExprList(partitionExpressions);
+
+            List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions = op.getOrderExpressions();
+            if (!orderExpressions.isEmpty()) {
+                stringBuilder.append(" order ");
+                printOrderExprList(orderExpressions);
+            }
+        }
+
         appendSchema(op, showDetails);
         appendAnnotations(op, showDetails);
         appendPhysicalOperatorInfo(op, showDetails);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 2784a6a..cc8c007 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -369,8 +369,14 @@
         }
 
         @Override
-        public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean topLevelOp) {
-            return new SinkWritePOperator();
+        public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean topLevelOp) throws AlgebricksException {
+            ILogicalExpression sourceExpr = op.getSourceExpression().getValue();
+            if (sourceExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw AlgebricksException.create(ErrorCode.EXPR_NOT_NORMALIZED, sourceExpr.getSourceLocation());
+            }
+            ensureAllVariables(op.getPartitionExpressions(), v -> v);
+            ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
+            return new SinkWritePOperator(op.getSourceVariable(), op.getPartitionVariables(), op.getOrderColumns());
         }
 
         @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7291473..af95280 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -154,6 +154,7 @@
     PARSING_ERROR(124),
     INVALID_INVERTED_LIST_TYPE_TRAITS(125),
     ILLEGAL_STATE(126),
+    UNSUPPORTED_WRITE_SPEC(127),
 
     // Compilation error codes.
     RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17891
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ib4fca256c6bdfa4b83890c285f509d476f130a54
Gerrit-Change-Number: 17891
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <wa...@gmail.com>
Gerrit-MessageType: newchange