You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2024/02/19 16:24:32 UTC

Change in asterixdb[master]: [ASTERIXDB-3357][COMP][RT] Compiler and runtime support for COPY TO s...

From Hussain Towaileb <hu...@gmail.com>:

Hussain Towaileb has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18169 )


Change subject: [ASTERIXDB-3357][COMP][RT] Compiler and runtime support for COPY TO statement
......................................................................

[ASTERIXDB-3357][COMP][RT] Compiler and runtime support for COPY TO statement

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
This change provides the compiler and runtime support
for COPY TO statement to write to different destinations.

Change-Id: Icea1ff9f32fe49ee83d0739f5c5a305c3345faa7
---
A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriter.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
R asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
C asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntimeFactory.java
R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriterFactory.java
A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntimeFactory.java
C asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriter.java
R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntime.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactory.java
C asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilterWriterFactoryProvider.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
C hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalFileWriter.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
C hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntime.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
40 files changed, 648 insertions(+), 315 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/69/18169/1

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 7dd0217..83b1373 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -361,7 +361,7 @@
         return translate(expr, outputDatasetName, (ICompiledDmlStatement) stmt, null, resultMetadata);
     }
 
-    private ILogicalPlan translateCopyTo(Query expr, CompiledStatements.ICompiledStatement stmt,
+    public ILogicalPlan translateCopyTo(Query expr, CompiledStatements.ICompiledStatement stmt,
             IResultMetadata resultMetadata) throws AlgebricksException {
         CompiledStatements.CompiledCopyToStatement copyTo = (CompiledStatements.CompiledCopyToStatement) stmt;
         MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
@@ -423,8 +423,7 @@
         // astPathExpressions has at least one expression see CopyToStatement constructor
         List<Expression> astPathExpressions = copyTo.getPathExpressions();
         ILogicalExpression fullPathExpr = null;
-        WriteDataSink writeDataSink;
-        String separator = String.valueOf(ExternalWriterProvider.getSeparator(copyTo.getAdapter()));
+        String separator = getExternalWriterSeparator(copyTo.getAdapter());
         List<Mutable<ILogicalExpression>> pathExprs = new ArrayList<>(astPathExpressions.size());
         Pair<ILogicalExpression, Mutable<ILogicalOperator>> pathExprPair;
         for (int i = 0; i < astPathExpressions.size(); i++) {
@@ -453,11 +452,24 @@
             fullPathExpr = concat;
         }
 
+        // Handle key
+        boolean autogenerated = copyTo.isAutogenerated();
+        List<Expression> astKeyExpressions = copyTo.getKeyExpressions();
+        List<Mutable<ILogicalExpression>> keyExpressionRefs = new ArrayList<>(astKeyExpressions.size());
+        for (int i = 0; i < copyTo.getKeyExpressions().size(); i++) {
+            Expression expression = astKeyExpressions.get(i);
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> expPair = langExprToAlgExpression(expression, topOpRef);
+            keyExpressionRefs.add(new MutableObject<>(expPair.first));
+            topOpRef = expPair.second;
+        }
+
         // Write adapter configuration
-        writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties());
+        WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties());
+
         // writeOperator
-        WriteOperator writeOperator = new WriteOperator(sourceExprRef, new MutableObject<>(fullPathExpr),
-                partitionExpressionRefs, orderExprListOut, writeDataSink);
+        WriteOperator writeOperator =
+                new WriteOperator(sourceExprRef, new MutableObject<>(fullPathExpr), partitionExpressionRefs,
+                        orderExprListOut, keyExpressionRefs, new MutableObject<>(autogenerated), writeDataSink);
         writeOperator.getInputs().add(topOpRef);
 
         // We need DistributeResultOperator to ensure all warnings to be delivered to the user
@@ -470,6 +482,10 @@
         return new ALogicalPlanImpl(globalPlanRoots);
     }
 
+    protected String getExternalWriterSeparator(String adapter) {
+        return String.valueOf(ExternalWriterProvider.getSeparator(adapter));
+    }
+
     public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
             ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
         MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index 5036fc8..a75984c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -37,11 +37,13 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
 import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -62,8 +64,8 @@
     public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
             new IExternalFileFilterWriterFactoryProvider() {
                 @Override
-                public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
-                    return new S3ExternalFileWriterFactory(configuration);
+                public IExternalFileWriterFactory create(ExternalWriterConfiguration configuration) {
+                    return new S3ExternalFileWriterFactory((ExternalFileWriterConfiguration) configuration);
                 }
 
                 @Override
@@ -84,14 +86,14 @@
     }
 
     @Override
-    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
         buildClient();
         String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        IExternalFilePrinter printer = printerFactory.createPrinter();
+        IExternalPrinter printer = printerFactory.createPrinter();
         IWarningCollector warningCollector = context.getWarningCollector();
-        return new S3ExternalFileWriter(printer, cloudClient, bucket, staticPath == null, warningCollector,
-                pathSourceLocation);
+        return new S3ExternalFileWriter((IExternalFilePrinter) printer, cloudClient, bucket, staticPath == null,
+                warningCollector, pathSourceLocation);
     }
 
     private void buildClient() throws HyracksDataException {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index 313757a..ef7355a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -24,10 +24,12 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,8 +41,8 @@
     public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
             new IExternalFileFilterWriterFactoryProvider() {
                 @Override
-                public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
-                    return new LocalFSExternalFileWriterFactory(configuration);
+                public IExternalFileWriterFactory create(ExternalWriterConfiguration configuration) {
+                    return new LocalFSExternalFileWriterFactory((ExternalFileWriterConfiguration) configuration);
                 }
 
                 @Override
@@ -63,7 +65,7 @@
     }
 
     @Override
-    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
         ILocalFSValidator validator = VALIDATOR;
         if (staticPath != null) {
@@ -72,7 +74,8 @@
             }
             validator = NO_OP_VALIDATOR;
         }
-        return new LocalFSExternalFileWriter(printerFactory.createPrinter(), validator, pathSourceLocation);
+        return new LocalFSExternalFileWriter((IExternalFilePrinter) printerFactory.createPrinter(), validator,
+                pathSourceLocation);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
index 8f8c63a..1132b6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -68,4 +68,9 @@
             delegate = null;
         }
     }
+
+    @Override
+    public OutputStream getOutputStream() {
+        return delegate;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
index 6778532..e3d0a66 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
@@ -19,23 +19,21 @@
 package org.apache.asterix.external.writer.printer;
 
 import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 
-public class TextualExternalFilePrinterFactory implements IExternalFilePrinterFactory {
-    private static final long serialVersionUID = 9155959967258587588L;
-    private final IPrinterFactory printerFactory;
+public class TextualExternalFilePrinterFactory extends TextualExternalPrinterFactory {
+    private static final long serialVersionUID = 8971234908711234L;
     private final IExternalFileCompressStreamFactory compressStreamFactory;
 
     public TextualExternalFilePrinterFactory(IPrinterFactory printerFactory,
             IExternalFileCompressStreamFactory compressStreamFactory) {
-        this.printerFactory = printerFactory;
+        super(printerFactory);
         this.compressStreamFactory = compressStreamFactory;
     }
 
     @Override
-    public IExternalFilePrinter createPrinter() {
+    public IExternalPrinter createPrinter() {
         return new TextualExternalFilePrinter(printerFactory.createPrinter(), compressStreamFactory);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
new file mode 100644
index 0000000..8c94fed
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class TextualExternalPrinter implements IExternalPrinter {
+    private final IPrinter printer;
+    private TextualOutputStreamDelegate delegate;
+    private PrintStream printStream;
+
+    TextualExternalPrinter(IPrinter printer) {
+        this.printer = printer;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        printer.init();
+        delegate = new TextualOutputStreamDelegate(new ByteArrayOutputStream());
+        printStream = new PrintStream(delegate);
+    }
+
+    @Override
+    public void print(IValueReference value) throws HyracksDataException {
+        printer.print(value.getByteArray(), value.getStartOffset(), value.getLength(), printStream);
+        delegate.checkError();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (printStream != null) {
+            printStream.close();
+            printStream = null;
+            delegate.checkError();
+            delegate = null;
+        }
+    }
+
+    public OutputStream getOutputStream() {
+        return delegate;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
new file mode 100644
index 0000000..d779793c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalPrinterFactory implements IExternalPrinterFactory {
+    private static final long serialVersionUID = 9155959967258587588L;
+    protected final IPrinterFactory printerFactory;
+
+    public TextualExternalPrinterFactory(IPrinterFactory printerFactory) {
+        this.printerFactory = printerFactory;
+    }
+
+    @Override
+    public IExternalPrinter createPrinter() {
+        return new TextualExternalPrinter(printerFactory.createPrinter());
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
index 2ec2661..6d27985 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
@@ -96,4 +96,9 @@
             throw HyracksDataException.create(exception);
         }
     }
+
+    @Override
+    public String toString() {
+        return stream.toString();
+    }
 }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
index 2520755..4cd8276 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
@@ -89,7 +89,7 @@
         this.orderByModifiers = orderByModifiers;
         this.orderByNullModifierList = orderByNullModifierList;
         this.varCounter = varCounter;
-        this.keyExpressions = keyExpressions;
+        this.keyExpressions = keyExpressions != null ? keyExpressions : new ArrayList<>();
         this.autogenerated = autogenerated;
 
         if (pathExpressions.isEmpty()) {
@@ -235,11 +235,7 @@
         return autogenerated;
     }
 
-    public boolean isSinkFileStore() {
+    public boolean isFileStoreSink() {
         return keyExpressions.isEmpty() && !autogenerated;
     }
-
-    public boolean isSinkDatabaseWithKey() {
-        return !keyExpressions.isEmpty() || autogenerated;
-    }
 }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 52e2678..6151b02 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -573,12 +573,10 @@
         cto.getSourceVariable().accept(this, step);
         out.println();
 
-        if (cto.isSinkFileStore()) {
+        if (cto.isFileStoreSink()) {
             formatPrintCopyToFileStore(cto, step);
-        } else if (cto.isSinkDatabaseWithKey()) {
-            formatPrintCopyToDatabaseWithKey(cto, step);
         } else {
-            throw new IllegalStateException("NYI: This should never happen");
+            formatPrintCopyToDatabaseWithKey(cto, step);
         }
 
         out.println("with ");
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8d8ca80..4ded4d4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -106,9 +106,9 @@
 import org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
-import org.apache.asterix.runtime.writer.ExternalWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -139,7 +139,7 @@
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.writer.SinkExternalWriterRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.writer.SinkExternalFileWriterRuntimeFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -178,7 +178,7 @@
 
 public class MetadataProvider implements IMetadataProvider<DataSourceId, String> {
 
-    private final ICcApplicationContext appCtx;
+    protected final ICcApplicationContext appCtx;
     private final IStorageComponentProvider storageComponentProvider;
     private final StorageProperties storageProperties;
     private final IFunctionManager functionManager;
@@ -761,15 +761,22 @@
         fileWriterFactory.validate();
         String fileExtension = ExternalWriterProvider.getFileExtension(sink);
         int maxResult = ExternalWriterProvider.getMaxResult(sink);
-        IExternalFilePrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType);
-        ExternalWriterFactory writerFactory = new ExternalWriterFactory(fileWriterFactory, printerFactory,
+        IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType);
+        ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory,
                 fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation);
-        SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
-                partitionComparatorFactories, inputDesc, writerFactory);
+        SinkExternalFileWriterRuntimeFactory runtime = new SinkExternalFileWriterRuntimeFactory(sourceColumn,
+                partitionColumns, partitionComparatorFactories, inputDesc, writerFactory);
         return new Pair<>(runtime, null);
     }
 
     @Override
+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteDatabaseWithKeyRuntime(int sourceColumn,
+            int[] keyColumns, IBinaryComparatorFactory[] keyComparatorFactories, boolean autogenerated,
+            IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType) throws AlgebricksException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
             IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9142556..3338e66 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -32,8 +32,8 @@
 import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -65,7 +65,8 @@
             throw new UnsupportedOperationException("Unsupported adapter " + adapterName);
         }
 
-        return creator.create(createConfiguration(appCtx, sink, staticPath, pathExpressionLocation));
+        return (IExternalFileWriterFactory) creator
+                .create(createConfiguration(appCtx, sink, staticPath, pathExpressionLocation));
     }
 
     public static String getFileExtension(IWriteDataSink sink) {
@@ -105,7 +106,7 @@
         CREATOR_MAP.put(adapterName.toLowerCase(), creator);
     }
 
-    public static IExternalFilePrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) {
+    public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) {
         Map<String, String> configuration = sink.getConfiguration();
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index 7105efa..c6d1dbe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -54,7 +54,7 @@
     @Override
     public String getPartitionDirectory(IFrameTupleReference tuple) throws HyracksDataException {
         if (!appendPrefix(tuple)) {
-            return ExternalWriter.UNRESOLVABLE_PATH;
+            return ExternalFileWriter.UNRESOLVABLE_PATH;
         }
 
         if (dirStringBuilder.length() > 0 && dirStringBuilder.charAt(dirStringBuilder.length() - 1) != fileSeparator) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
new file mode 100644
index 0000000..2fe3ba7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalFileWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+final class ExternalFileWriter implements IHyracksExternalFileWriter {
+    static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
+    private final IPathResolver pathResolver;
+    private final IExternalFileWriter writer;
+    private final int maxResultPerFile;
+    private String partitionPath;
+    private int tupleCounter;
+
+    public ExternalFileWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
+        this.pathResolver = pathResolver;
+        this.writer = writer;
+        this.maxResultPerFile = maxResultPerFile;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+
+    @Override
+    public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
+        partitionPath = pathResolver.getPartitionDirectory(tuple);
+        if (UNRESOLVABLE_PATH != partitionPath) {
+            writer.validate(partitionPath);
+            newFile();
+        }
+    }
+
+    @Override
+    public void write(IValueReference value) throws HyracksDataException {
+        if (UNRESOLVABLE_PATH == partitionPath) {
+            // Ignore writing values for unresolvable partition paths
+            return;
+        }
+        writer.write(value);
+        tupleCounter++;
+        if (tupleCounter >= maxResultPerFile) {
+            newFile();
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        writer.abort();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+
+    private void newFile() throws HyracksDataException {
+        tupleCounter = 0;
+        if (!writer.newFile(partitionPath, pathResolver.getNextFileName())) {
+            // the partitionPath could contain illegal chars or the length of the total path is too long
+            partitionPath = UNRESOLVABLE_PATH;
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
index b62a07a..0f9974b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
@@ -22,24 +22,19 @@
 
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
-public final class ExternalFileWriterConfiguration {
-    private final Map<String, String> configuration;
+public final class ExternalFileWriterConfiguration extends ExternalWriterConfiguration {
     private final SourceLocation pathSourceLocation;
     private final String staticPath;
     private final boolean singleNodeCluster;
 
     public ExternalFileWriterConfiguration(Map<String, String> configuration, SourceLocation pathSourceLocation,
             String staticPath, boolean singleNodeCluster) {
-        this.configuration = configuration;
+        super(configuration);
         this.pathSourceLocation = pathSourceLocation;
         this.staticPath = staticPath;
         this.singleNodeCluster = singleNodeCluster;
     }
 
-    public Map<String, String> getConfiguration() {
-        return configuration;
-    }
-
     public SourceLocation getPathSourceLocation() {
         return pathSourceLocation;
     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
new file mode 100644
index 0000000..038127d
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class ExternalFileWriterFactory implements IHyracksExternalWriterFactory {
+    private static final long serialVersionUID = 1412969574113419638L;
+    private final IExternalFileWriterFactory writerFactory;
+    private final IExternalPrinterFactory printerFactory;
+    private final String fileExtension;
+    private final int maxResult;
+    private final IScalarEvaluatorFactory pathEvalFactory;
+    private final String staticPath;
+    private final SourceLocation pathSourceLocation;
+
+    public ExternalFileWriterFactory(IExternalFileWriterFactory writerFactory, IExternalPrinterFactory printerFactory,
+            String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
+            SourceLocation pathSourceLocation) {
+        this.writerFactory = writerFactory;
+        this.printerFactory = printerFactory;
+        this.fileExtension = fileExtension;
+        this.maxResult = maxResult;
+        this.pathEvalFactory = pathEvalFactory;
+        this.staticPath = staticPath;
+        this.pathSourceLocation = pathSourceLocation;
+    }
+
+    @Override
+    public IHyracksExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
+        int partition = context.getTaskAttemptId().getTaskId().getPartition();
+        char fileSeparator = writerFactory.getSeparator();
+        IPathResolver resolver;
+        if (staticPath == null) {
+            EvaluatorContext evaluatorContext = new EvaluatorContext(context);
+            IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
+            IWarningCollector warningCollector = context.getWarningCollector();
+            resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
+                    pathSourceLocation);
+        } else {
+            resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
+        }
+        IExternalFileWriter writer = (IExternalFileWriter) writerFactory.createWriter(context, printerFactory);
+        return new ExternalFileWriter(resolver, writer, maxResult);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
index 5fc07af..7a06afa 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
@@ -18,23 +18,15 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-final class ExternalWriter implements IExternalWriter {
-    static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
-    private final IPathResolver pathResolver;
-    private final IExternalFileWriter writer;
-    private final int maxResultPerFile;
-    private String partitionPath;
-    private int tupleCounter;
+final class ExternalWriter implements IHyracksExternalWriter {
+    private final IExternalWriter writer;
 
-    public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
-        this.pathResolver = pathResolver;
+    public ExternalWriter(IExternalWriter writer) {
         this.writer = writer;
-        this.maxResultPerFile = maxResultPerFile;
     }
 
     @Override
@@ -43,25 +35,8 @@
     }
 
     @Override
-    public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
-        partitionPath = pathResolver.getPartitionDirectory(tuple);
-        if (UNRESOLVABLE_PATH != partitionPath) {
-            writer.validate(partitionPath);
-            newFile();
-        }
-    }
-
-    @Override
     public void write(IValueReference value) throws HyracksDataException {
-        if (UNRESOLVABLE_PATH == partitionPath) {
-            // Ignore writing values for unresolvable partition paths
-            return;
-        }
         writer.write(value);
-        tupleCounter++;
-        if (tupleCounter >= maxResultPerFile) {
-            newFile();
-        }
     }
 
     @Override
@@ -73,12 +48,4 @@
     public void close() throws HyracksDataException {
         writer.close();
     }
-
-    private void newFile() throws HyracksDataException {
-        tupleCounter = 0;
-        if (!writer.newFile(partitionPath, pathResolver.getNextFileName())) {
-            // the partitionPath could contain illegal chars or the length of the total path is too long
-            partitionPath = UNRESOLVABLE_PATH;
-        }
-    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
similarity index 71%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
index a4fa97b..abaaf68 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
@@ -18,14 +18,16 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import java.io.Serializable;
+import java.util.Map;
 
-/**
- * {@link IExternalFileWriter} printer factory
- */
-public interface IExternalFilePrinterFactory extends Serializable {
-    /**
-     * @return a new external file printer
-     */
-    IExternalFilePrinter createPrinter();
+public class ExternalWriterConfiguration {
+    private final Map<String, String> configuration;
+
+    public ExternalWriterConfiguration(Map<String, String> configuration) {
+        this.configuration = configuration;
+    }
+
+    public Map<String, String> getConfiguration() {
+        return configuration;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
index e7c0db0..2d1311f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
@@ -18,53 +18,24 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
 
-public class ExternalWriterFactory implements IExternalWriterFactory {
-    private static final long serialVersionUID = 1412969574113419638L;
-    private final IExternalFileWriterFactory writerFactory;
-    private final IExternalFilePrinterFactory printerFactory;
-    private final String fileExtension;
-    private final int maxResult;
-    private final IScalarEvaluatorFactory pathEvalFactory;
-    private final String staticPath;
-    private final SourceLocation pathSourceLocation;
+public class ExternalWriterFactory implements IHyracksExternalWriterFactory {
+    private static final long serialVersionUID = 1412969574113419639L;
+    private final IExternalWriterFactory writerFactory;
+    private final IExternalPrinterFactory printerFactory;
 
-    public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, IExternalFilePrinterFactory printerFactory,
-            String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
-            SourceLocation pathSourceLocation) {
+    public ExternalWriterFactory(IExternalWriterFactory writerFactory, IExternalPrinterFactory printerFactory) {
         this.writerFactory = writerFactory;
         this.printerFactory = printerFactory;
-        this.fileExtension = fileExtension;
-        this.maxResult = maxResult;
-        this.pathEvalFactory = pathEvalFactory;
-        this.staticPath = staticPath;
-        this.pathSourceLocation = pathSourceLocation;
     }
 
     @Override
-    public IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
-        int partition = context.getTaskAttemptId().getTaskId().getPartition();
-        char fileSeparator = writerFactory.getSeparator();
-        IPathResolver resolver;
-        if (staticPath == null) {
-            EvaluatorContext evaluatorContext = new EvaluatorContext(context);
-            IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
-            IWarningCollector warningCollector = context.getWarningCollector();
-            resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
-                    pathSourceLocation);
-        } else {
-            resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
-        }
-        IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
-        return new ExternalWriter(resolver, writer, maxResult);
+    public IHyracksExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
+        IExternalWriter writer = writerFactory.createWriter(context, printerFactory);
+        return new ExternalWriter(writer);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
index 7a863f7..383f300 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
@@ -18,8 +18,7 @@
  */
 package org.apache.asterix.runtime.writer;
 
-public interface IExternalFileFilterWriterFactoryProvider {
-    IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration);
+public interface IExternalFileFilterWriterFactoryProvider extends IExternalFilterWriterFactoryProvider {
 
     char getSeparator();
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
index ba5fa1d..8db70ae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
@@ -21,17 +21,11 @@
 import java.io.OutputStream;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
 
 /**
- * An {@link IExternalFileWriter} printer
+ * An {@link IExternalWriter} printer
  */
-public interface IExternalFilePrinter {
-
-    /**
-     * Open the printer
-     */
-    void open() throws HyracksDataException;
+public interface IExternalFilePrinter extends IExternalPrinter {
 
     /**
      * Initialize the printer with a new stream
@@ -39,16 +33,4 @@
      * @param outputStream to print to
      */
     void newStream(OutputStream outputStream) throws HyracksDataException;
-
-    /**
-     * Print the provided value
-     *
-     * @param value to print
-     */
-    void print(IValueReference value) throws HyracksDataException;
-
-    /**
-     * Flush and close the printer
-     */
-    void close() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
index f8ae81b..197ac86 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -19,17 +19,11 @@
 package org.apache.asterix.runtime.writer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
 
 /**
  * A file writer
  */
-public interface IExternalFileWriter {
-
-    /**
-     * Open the writer
-     */
-    void open() throws HyracksDataException;
+public interface IExternalFileWriter extends IExternalWriter {
 
     /**
      * Validate the writing directory
@@ -46,21 +40,4 @@
      * @return true if a new file can be created, false otherwise
      */
     boolean newFile(String directory, String fileName) throws HyracksDataException;
-
-    /**
-     * Writer the provided value
-     *
-     * @param value to write
-     */
-    void write(IValueReference value) throws HyracksDataException;
-
-    /**
-     * Run the abort sequence in case of a failure
-     */
-    void abort() throws HyracksDataException;
-
-    /**
-     * Flush the final result and close the writer
-     */
-    void close() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
index d8f1f84..9f42e94 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -18,34 +18,10 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An interface for writing to a storage device
- * Implementer should also provide a singleton to {@link IExternalFileFilterWriterFactoryProvider}
- */
-public interface IExternalFileWriterFactory extends Serializable {
-    /**
-     * Create a writer
-     *
-     * @param context        task context
-     * @param printerFactory printer factory for writing the final result
-     * @return a new file writer
-     */
-    IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
-            throws HyracksDataException;
+public interface IExternalFileWriterFactory extends IExternalWriterFactory {
 
     /**
      * @return file (or path) separator
      */
     char getSeparator();
-
-    /**
-     * Validate the writer by running a test write routine to ensure the writer has the appropriate permissions
-     */
-    void validate() throws AlgebricksException;
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilterWriterFactoryProvider.java
similarity index 76%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilterWriterFactoryProvider.java
index a4fa97b..ca64544 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilterWriterFactoryProvider.java
@@ -18,14 +18,6 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import java.io.Serializable;
-
-/**
- * {@link IExternalFileWriter} printer factory
- */
-public interface IExternalFilePrinterFactory extends Serializable {
-    /**
-     * @return a new external file printer
-     */
-    IExternalFilePrinter createPrinter();
+public interface IExternalFilterWriterFactoryProvider {
+    IExternalWriterFactory create(ExternalWriterConfiguration configuration);
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
new file mode 100644
index 0000000..7cd8597
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.OutputStream;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * An {@link IExternalWriter} printer
+ */
+public interface IExternalPrinter {
+
+    /**
+     * Open the printer
+     */
+    void open() throws HyracksDataException;
+
+    /**
+     * Print the provided value
+     *
+     * @param value to print
+     */
+    void print(IValueReference value) throws HyracksDataException;
+
+    /**
+     * Flush and close the printer
+     */
+    void close() throws HyracksDataException;
+
+    /**
+     * @return return the output stream
+     */
+    OutputStream getOutputStream();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
similarity index 86%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
index a4fa97b..4d9352a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
@@ -23,9 +23,9 @@
 /**
  * {@link IExternalFileWriter} printer factory
  */
-public interface IExternalFilePrinterFactory extends Serializable {
+public interface IExternalPrinterFactory extends Serializable {
     /**
-     * @return a new external file printer
+     * @return a new external printer
      */
-    IExternalFilePrinter createPrinter();
+    IExternalPrinter createPrinter();
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriter.java
similarity index 74%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriter.java
index 81ed880..a98de02 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriter.java
@@ -16,32 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.runtime.writers;
+package org.apache.asterix.runtime.writer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
- * An external writer of a query (or dataset) result
+ * An external writer
  */
 public interface IExternalWriter {
+
     /**
      * Open the writer
      */
     void open() throws HyracksDataException;
 
     /**
-     * Initialize the writer for a new partition
+     * Writer the provided value
      *
-     * @param tuple which contains the partitioning columns
-     */
-    void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException;
-
-    /**
-     * Write the provided value
-     *
-     * @param value to be written
+     * @param value to write
      */
     void write(IValueReference value) throws HyracksDataException;
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactory.java
new file mode 100644
index 0000000..bf09503
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface for writing to a storage device
+ * Implementer should also provide a singleton to {@link IExternalFileFilterWriterFactoryProvider}
+ */
+public interface IExternalWriterFactory extends Serializable {
+    /**
+     * Create a writer
+     *
+     * @param context        task context
+     * @param printerFactory printer factory for writing the final result
+     * @return a new file writer
+     */
+    IExternalWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
+            throws HyracksDataException;
+
+    /**
+     * Perform the necessary validation to ensure the writer has the proper permissions
+     */
+    void validate() throws AlgebricksException;
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 2072dee..afacb59 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -65,6 +65,10 @@
             SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
             throws AlgebricksException;
 
+    Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteDatabaseWithKeyRuntime(int sourceColumn,
+            int[] keyColumns, IBinaryComparatorFactory[] keyComparatorFactories, boolean autogenerated,
+            IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType) throws AlgebricksException;
+
     Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, int[] printColumns,
             IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
             IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
index 7eef90e..6ee509e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
@@ -41,17 +41,22 @@
     private final Mutable<ILogicalExpression> pathExpression;
     private final List<Mutable<ILogicalExpression>> partitionExpressions;
     private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+    private final List<Mutable<ILogicalExpression>> keyExpressions;
+    private final Mutable<Boolean> autogenerated;
     private final IWriteDataSink writeDataSink;
 
     public WriteOperator(Mutable<ILogicalExpression> sourceExpression, Mutable<ILogicalExpression> pathExpression,
             List<Mutable<ILogicalExpression>> partitionExpressions,
             List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
+            List<Mutable<ILogicalExpression>> keyExpressions, Mutable<Boolean> autogenerated,
             IWriteDataSink writeDataSink) {
         this.sourceExpression = sourceExpression;
         this.pathExpression = pathExpression;
         this.partitionExpressions = partitionExpressions;
         this.orderExpressions = orderExpressions;
         this.writeDataSink = writeDataSink;
+        this.keyExpressions = keyExpressions;
+        this.autogenerated = autogenerated;
     }
 
     public Mutable<ILogicalExpression> getSourceExpression() {
@@ -74,6 +79,18 @@
         return orderExpressions;
     }
 
+    public List<Mutable<ILogicalExpression>> getKeyExpressions() {
+        return keyExpressions;
+    }
+
+    public List<LogicalVariable> getKeyVariables() {
+        List<LogicalVariable> keyVariables = new ArrayList<>();
+        for (Mutable<ILogicalExpression> keyExpression : keyExpressions) {
+            keyVariables.add(VariableUtilities.getVariable(keyExpression.getValue()));
+        }
+        return keyVariables;
+    }
+
     public List<LogicalVariable> getPartitionVariables() {
         List<LogicalVariable> partitionVariables = new ArrayList<>();
         for (Mutable<ILogicalExpression> partitionExpression : partitionExpressions) {
@@ -92,10 +109,18 @@
         return orderColumns;
     }
 
+    public boolean getAutogenerated() {
+        return autogenerated.getValue();
+    }
+
     public IWriteDataSink getWriteDataSink() {
         return writeDataSink;
     }
 
+    public boolean isFileStoreSink() {
+        return keyExpressions.isEmpty() && !autogenerated.getValue();
+    }
+
     @Override
     public LogicalOperatorTag getOperatorTag() {
         return LogicalOperatorTag.WRITE;
@@ -119,6 +144,10 @@
             changed |= visitor.transform(orderExpressionPair.second);
         }
 
+        for (Mutable<ILogicalExpression> expression : keyExpressions) {
+            changed |= visitor.transform(expression);
+        }
+
         return changed;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index fa47ae5..a240b50 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -297,9 +297,12 @@
                 deepCopyExpressionRefs(new ArrayList<>(), op.getPartitionExpressions());
         List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderPairExpressions =
                 deepCopyOrderAndExpression(op.getOrderExpressions());
+        List<Mutable<ILogicalExpression>> newKeyPairExpressions =
+                deepCopyExpressionRefs(new ArrayList<>(), op.getKeyExpressions());
         IWriteDataSink writeDataSink = op.getWriteDataSink().createCopy();
         return new WriteOperator(newSourceExpression, newPathExpression, newPartitionExpressions,
-                newOrderPairExpressions, writeDataSink);
+                newOrderPairExpressions, newKeyPairExpressions, new MutableObject<>(op.getAutogenerated()),
+                writeDataSink);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index d462cd5..6c87fdb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -61,12 +61,16 @@
     private final LogicalVariable sourceVariable;
     private final List<LogicalVariable> partitionVariables;
     private final List<OrderColumn> orderColumns;
+    private final List<LogicalVariable> keyVariables;
+    private final boolean autogenerated;
 
     public SinkWritePOperator(LogicalVariable sourceVariable, List<LogicalVariable> partitionVariables,
-            List<OrderColumn> orderColumns) {
+            List<OrderColumn> orderColumns, List<LogicalVariable> keyVariables, boolean autogenerated) {
         this.sourceVariable = sourceVariable;
         this.partitionVariables = partitionVariables;
         this.orderColumns = orderColumns;
+        this.keyVariables = keyVariables;
+        this.autogenerated = autogenerated;
     }
 
     @Override
@@ -145,6 +149,11 @@
         IBinaryComparatorFactory[] partitionComparatorFactories =
                 JobGenHelper.variablesToAscBinaryComparatorFactories(partitionVariables, typeEnv, context);
 
+        // Key columns
+        int[] keyColumns = JobGenHelper.projectVariables(schema, keyVariables);
+        IBinaryComparatorFactory[] keyComparatorFactories =
+                JobGenHelper.variablesToAscBinaryComparatorFactories(keyVariables, typeEnv, context);
+
         RecordDescriptor recDesc =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
@@ -152,9 +161,17 @@
 
         IMetadataProvider<?, ?> mp = context.getMetadataProvider();
 
-        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getWriteFileRuntime(
-                sourceColumn, partitionColumns, partitionComparatorFactories, dynamicPathEvalFactory, staticPathExpr,
-                pathExpr.getSourceLocation(), writeDataSink, inputDesc, typeEnv.getVarType(sourceVariable));
+        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints;
+        if (write.isFileStoreSink()) {
+            runtimeAndConstraints = mp.getWriteFileRuntime(sourceColumn, partitionColumns, partitionComparatorFactories,
+                    dynamicPathEvalFactory, staticPathExpr, pathExpr.getSourceLocation(), writeDataSink, inputDesc,
+                    typeEnv.getVarType(sourceVariable));
+
+        } else {
+            runtimeAndConstraints = mp.getWriteDatabaseWithKeyRuntime(sourceColumn, keyColumns, keyComparatorFactories,
+                    autogenerated, writeDataSink, inputDesc, typeEnv.getVarType(sourceVariable));
+        }
+
         IPushRuntimeFactory runtime = runtimeAndConstraints.first;
         runtime.setSourceLocation(write.getSourceLocation());
 
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f945994..b440305 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -408,7 +408,8 @@
             }
             ensureAllVariables(op.getPartitionExpressions(), v -> v);
             ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
-            return new SinkWritePOperator(op.getSourceVariable(), op.getPartitionVariables(), op.getOrderColumns());
+            return new SinkWritePOperator(op.getSourceVariable(), op.getPartitionVariables(), op.getOrderColumns(),
+                    op.getKeyVariables(), op.getAutogenerated());
         }
 
         @Override
@@ -648,4 +649,4 @@
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntime.java
similarity index 62%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
copy to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntime.java
index 01e137b..7ffc65d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntime.java
@@ -21,46 +21,40 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 
-final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+final class SinkExternalDatabaseWithKeyWriterRuntime extends AbstractOneInputSinkPushRuntime {
     private final int sourceColumn;
-    private final int[] partitionColumns;
     private final IPointable sourceValue;
-    private final PointableTupleReference partitionColumnsPrevCopy;
-    private final PermutingFrameTupleReference partitionColumnsRef;
-    private final IBinaryComparator[] partitionComparators;
-    private final IExternalWriter writer;
+    private final int[] keyColumns;
+    private final IPointable[] keyValues;
+    private final IBinaryComparatorFactory[] keyComparatorFactories;
+    private final boolean autogenerated;
+    private final IHyracksExternalWriter writer;
     private FrameTupleAccessor tupleAccessor;
     private FrameTupleReference tupleRef;
-    private boolean first;
     private IFrameWriter frameWriter;
 
-    SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
-            RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+    SinkExternalDatabaseWithKeyWriterRuntime(int sourceColumn, int[] keyColumns,
+            IBinaryComparatorFactory[] keyComparatorFactories, boolean autogenerated, RecordDescriptor inputRecordDesc,
+            IHyracksExternalWriter writer) {
         this.sourceColumn = sourceColumn;
-        this.partitionColumns = partitionColumns;
         this.sourceValue = new VoidPointable();
-        partitionColumnsRef = new PermutingFrameTupleReference(partitionColumns);
-        partitionColumnsPrevCopy =
-                PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
-        this.partitionComparators = partitionComparators;
+        this.keyColumns = keyColumns;
+        this.keyValues = new VoidPointable[keyColumns.length];
+        this.keyComparatorFactories = keyComparatorFactories;
+        this.autogenerated = autogenerated;
         this.inputRecordDesc = inputRecordDesc;
         this.writer = writer;
-        first = true;
     }
 
     @Override
@@ -78,13 +72,8 @@
         tupleAccessor.reset(buffer);
         for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
             tupleRef.reset(tupleAccessor, i);
-            if (isNewPartition(i)) {
-                writer.initNewPartition(tupleRef);
-            }
             setValue(tupleRef, sourceColumn, sourceValue);
             writer.write(sourceValue);
-            partitionColumnsRef.reset(tupleAccessor, i);
-            partitionColumnsPrevCopy.set(partitionColumnsRef);
         }
     }
 
@@ -105,16 +94,6 @@
         this.frameWriter = frameWriter;
     }
 
-    private boolean isNewPartition(int index) throws HyracksDataException {
-        if (first) {
-            first = false;
-            return true;
-        }
-
-        return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor, index, partitionColumns,
-                partitionComparators);
-    }
-
     private void setValue(IFrameTupleReference tuple, int column, IPointable value) {
         byte[] data = tuple.getFieldData(column);
         int start = tuple.getFieldStart(column);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntimeFactory.java
new file mode 100644
index 0000000..fede2ec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntimeFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class SinkExternalDatabaseWithKeyWriterRuntimeFactory extends AbstractPushRuntimeFactory {
+    private static final long serialVersionUID = -2215789207336628581L;
+    private final int sourceColumn;
+    private final int[] keyColumns;
+    private final IBinaryComparatorFactory[] keyComparatorFactories;
+    private final boolean autogenerated;
+    private final RecordDescriptor inputRecordDescriptor;
+    private final IHyracksExternalWriterFactory writerFactory;
+
+    public SinkExternalDatabaseWithKeyWriterRuntimeFactory(int sourceColumn, int[] keyColumns,
+            IBinaryComparatorFactory[] keyComparatorFactories, boolean autogenerated,
+            RecordDescriptor inputRecordDescriptor, IHyracksExternalWriterFactory writerFactory) {
+        this.sourceColumn = sourceColumn;
+        this.keyColumns = keyColumns;
+        this.keyComparatorFactories = keyComparatorFactories;
+        this.autogenerated = autogenerated;
+        this.inputRecordDescriptor = inputRecordDescriptor;
+        this.writerFactory = writerFactory;
+    }
+
+    @Override
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+        IHyracksExternalWriter writer = writerFactory.createWriter(ctx);
+        SinkExternalDatabaseWithKeyWriterRuntime runtime = new SinkExternalDatabaseWithKeyWriterRuntime(sourceColumn,
+                keyColumns, keyComparatorFactories, autogenerated, inputRecordDescriptor, writer);
+        return new IPushRuntime[] { runtime };
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntime.java
similarity index 89%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntime.java
index 01e137b..25c3bc0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntime.java
@@ -21,7 +21,8 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalFileWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -36,21 +37,21 @@
 import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
 
-final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+final class SinkExternalFileWriterRuntime extends AbstractOneInputSinkPushRuntime {
     private final int sourceColumn;
     private final int[] partitionColumns;
     private final IPointable sourceValue;
     private final PointableTupleReference partitionColumnsPrevCopy;
     private final PermutingFrameTupleReference partitionColumnsRef;
     private final IBinaryComparator[] partitionComparators;
-    private final IExternalWriter writer;
+    private final IHyracksExternalFileWriter writer;
     private FrameTupleAccessor tupleAccessor;
     private FrameTupleReference tupleRef;
     private boolean first;
     private IFrameWriter frameWriter;
 
-    SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
-            RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+    SinkExternalFileWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
+            RecordDescriptor inputRecordDesc, IHyracksExternalWriter writer) {
         this.sourceColumn = sourceColumn;
         this.partitionColumns = partitionColumns;
         this.sourceValue = new VoidPointable();
@@ -59,7 +60,7 @@
                 PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
         this.partitionComparators = partitionComparators;
         this.inputRecordDesc = inputRecordDesc;
-        this.writer = writer;
+        this.writer = (IHyracksExternalFileWriter) writer;
         first = true;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntimeFactory.java
similarity index 79%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntimeFactory.java
index 6220dec..43bffe5 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntimeFactory.java
@@ -20,25 +20,25 @@
 
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public final class SinkExternalWriterRuntimeFactory extends AbstractPushRuntimeFactory {
+public final class SinkExternalFileWriterRuntimeFactory extends AbstractPushRuntimeFactory {
     private static final long serialVersionUID = -2215789207336628581L;
     private final int sourceColumn;
     private final int[] partitionColumn;
     private final IBinaryComparatorFactory[] partitionComparatorFactories;
     private final RecordDescriptor inputRecordDescriptor;
-    private final IExternalWriterFactory writerFactory;
+    private final IHyracksExternalWriterFactory writerFactory;
 
-    public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumn,
+    public SinkExternalFileWriterRuntimeFactory(int sourceColumn, int[] partitionColumn,
             IBinaryComparatorFactory[] partitionComparatorFactories, RecordDescriptor inputRecordDescriptor,
-            IExternalWriterFactory writerFactory) {
+            IHyracksExternalWriterFactory writerFactory) {
         this.sourceColumn = sourceColumn;
         this.partitionColumn = partitionColumn;
         this.partitionComparatorFactories = partitionComparatorFactories;
@@ -48,12 +48,12 @@
 
     @Override
     public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
-        IExternalWriter writer = writerFactory.createWriter(ctx);
+        IHyracksExternalWriter writer = writerFactory.createWriter(ctx);
         IBinaryComparator[] partitionComparators = new IBinaryComparator[partitionComparatorFactories.length];
         for (int i = 0; i < partitionComparatorFactories.length; i++) {
             partitionComparators[i] = partitionComparatorFactories[i].createBinaryComparator();
         }
-        SinkExternalWriterRuntime runtime = new SinkExternalWriterRuntime(sourceColumn, partitionColumn,
+        SinkExternalFileWriterRuntime runtime = new SinkExternalFileWriterRuntime(sourceColumn, partitionColumn,
                 partitionComparators, inputRecordDescriptor, writer);
         return new IPushRuntime[] { runtime };
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalFileWriter.java
similarity index 67%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
copy to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalFileWriter.java
index e6899a6..3989394 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalFileWriter.java
@@ -18,21 +18,15 @@
  */
 package org.apache.hyracks.algebricks.runtime.writers;
 
-import java.io.Serializable;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-/**
- * A writer factory which creates a writer for the result of a query (or dataset) to a storage device
- */
-@FunctionalInterface
-public interface IExternalWriterFactory extends Serializable {
+public interface IHyracksExternalFileWriter extends IHyracksExternalWriter {
+
     /**
-     * Crete a new writer
+     * Initialize the writer for a new partition
      *
-     * @param context task context
-     * @return new writer
+     * @param tuple which contains the partitioning columns
      */
-    IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException;
+    void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriter.java
similarity index 82%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriter.java
index 81ed880..00826e4 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriter.java
@@ -20,25 +20,17 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * An external writer of a query (or dataset) result
  */
-public interface IExternalWriter {
+public interface IHyracksExternalWriter {
     /**
      * Open the writer
      */
     void open() throws HyracksDataException;
 
     /**
-     * Initialize the writer for a new partition
-     *
-     * @param tuple which contains the partitioning columns
-     */
-    void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException;
-
-    /**
      * Write the provided value
      *
      * @param value to be written
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriterFactory.java
similarity index 88%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriterFactory.java
index e6899a6..547fa9b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriterFactory.java
@@ -27,12 +27,12 @@
  * A writer factory which creates a writer for the result of a query (or dataset) to a storage device
  */
 @FunctionalInterface
-public interface IExternalWriterFactory extends Serializable {
+public interface IHyracksExternalWriterFactory extends Serializable {
     /**
      * Crete a new writer
      *
      * @param context task context
      * @return new writer
      */
-    IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException;
+    IHyracksExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException;
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18169
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Icea1ff9f32fe49ee83d0739f5c5a305c3345faa7
Gerrit-Change-Number: 18169
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <hu...@gmail.com>
Gerrit-MessageType: newchange