You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2024/02/19 16:24:32 UTC
Change in asterixdb[master]: [ASTERIXDB-3357][COMP][RT] Compiler and runtime support for COPY TO s...
From Hussain Towaileb <hu...@gmail.com>:
Hussain Towaileb has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18169 )
Change subject: [ASTERIXDB-3357][COMP][RT] Compiler and runtime support for COPY TO statement
......................................................................
[ASTERIXDB-3357][COMP][RT] Compiler and runtime support for COPY TO statement
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This change provides the compiler and runtime support
for COPY TO statement to write to different destinations.
Change-Id: Icea1ff9f32fe49ee83d0739f5c5a305c3345faa7
---
A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriter.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
R asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
C asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntimeFactory.java
R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriterFactory.java
A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntimeFactory.java
C asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriter.java
R hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntime.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactory.java
C asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilterWriterFactoryProvider.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
C hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalFileWriter.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
C hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntime.java
M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
40 files changed, 648 insertions(+), 315 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/69/18169/1
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 7dd0217..83b1373 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -361,7 +361,7 @@
return translate(expr, outputDatasetName, (ICompiledDmlStatement) stmt, null, resultMetadata);
}
- private ILogicalPlan translateCopyTo(Query expr, CompiledStatements.ICompiledStatement stmt,
+ public ILogicalPlan translateCopyTo(Query expr, CompiledStatements.ICompiledStatement stmt,
IResultMetadata resultMetadata) throws AlgebricksException {
CompiledStatements.CompiledCopyToStatement copyTo = (CompiledStatements.CompiledCopyToStatement) stmt;
MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
@@ -423,8 +423,7 @@
// astPathExpressions has at least one expression see CopyToStatement constructor
List<Expression> astPathExpressions = copyTo.getPathExpressions();
ILogicalExpression fullPathExpr = null;
- WriteDataSink writeDataSink;
- String separator = String.valueOf(ExternalWriterProvider.getSeparator(copyTo.getAdapter()));
+ String separator = getExternalWriterSeparator(copyTo.getAdapter());
List<Mutable<ILogicalExpression>> pathExprs = new ArrayList<>(astPathExpressions.size());
Pair<ILogicalExpression, Mutable<ILogicalOperator>> pathExprPair;
for (int i = 0; i < astPathExpressions.size(); i++) {
@@ -453,11 +452,24 @@
fullPathExpr = concat;
}
+ // Handle key
+ boolean autogenerated = copyTo.isAutogenerated();
+ List<Expression> astKeyExpressions = copyTo.getKeyExpressions();
+ List<Mutable<ILogicalExpression>> keyExpressionRefs = new ArrayList<>(astKeyExpressions.size());
+ for (int i = 0; i < copyTo.getKeyExpressions().size(); i++) {
+ Expression expression = astKeyExpressions.get(i);
+ Pair<ILogicalExpression, Mutable<ILogicalOperator>> expPair = langExprToAlgExpression(expression, topOpRef);
+ keyExpressionRefs.add(new MutableObject<>(expPair.first));
+ topOpRef = expPair.second;
+ }
+
// Write adapter configuration
- writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties());
+ WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties());
+
// writeOperator
- WriteOperator writeOperator = new WriteOperator(sourceExprRef, new MutableObject<>(fullPathExpr),
- partitionExpressionRefs, orderExprListOut, writeDataSink);
+ WriteOperator writeOperator =
+ new WriteOperator(sourceExprRef, new MutableObject<>(fullPathExpr), partitionExpressionRefs,
+ orderExprListOut, keyExpressionRefs, new MutableObject<>(autogenerated), writeDataSink);
writeOperator.getInputs().add(topOpRef);
// We need DistributeResultOperator to ensure all warnings to be delivered to the user
@@ -470,6 +482,10 @@
return new ALogicalPlanImpl(globalPlanRoots);
}
+ protected String getExternalWriterSeparator(String adapter) {
+ return String.valueOf(ExternalWriterProvider.getSeparator(adapter));
+ }
+
public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index 5036fc8..a75984c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -37,11 +37,13 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -62,8 +64,8 @@
public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
new IExternalFileFilterWriterFactoryProvider() {
@Override
- public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
- return new S3ExternalFileWriterFactory(configuration);
+ public IExternalFileWriterFactory create(ExternalWriterConfiguration configuration) {
+ return new S3ExternalFileWriterFactory((ExternalFileWriterConfiguration) configuration);
}
@Override
@@ -84,14 +86,14 @@
}
@Override
- public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
throws HyracksDataException {
buildClient();
String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- IExternalFilePrinter printer = printerFactory.createPrinter();
+ IExternalPrinter printer = printerFactory.createPrinter();
IWarningCollector warningCollector = context.getWarningCollector();
- return new S3ExternalFileWriter(printer, cloudClient, bucket, staticPath == null, warningCollector,
- pathSourceLocation);
+ return new S3ExternalFileWriter((IExternalFilePrinter) printer, cloudClient, bucket, staticPath == null,
+ warningCollector, pathSourceLocation);
}
private void buildClient() throws HyracksDataException {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index 313757a..ef7355a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -24,10 +24,12 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,8 +41,8 @@
public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
new IExternalFileFilterWriterFactoryProvider() {
@Override
- public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
- return new LocalFSExternalFileWriterFactory(configuration);
+ public IExternalFileWriterFactory create(ExternalWriterConfiguration configuration) {
+ return new LocalFSExternalFileWriterFactory((ExternalFileWriterConfiguration) configuration);
}
@Override
@@ -63,7 +65,7 @@
}
@Override
- public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
throws HyracksDataException {
ILocalFSValidator validator = VALIDATOR;
if (staticPath != null) {
@@ -72,7 +74,8 @@
}
validator = NO_OP_VALIDATOR;
}
- return new LocalFSExternalFileWriter(printerFactory.createPrinter(), validator, pathSourceLocation);
+ return new LocalFSExternalFileWriter((IExternalFilePrinter) printerFactory.createPrinter(), validator,
+ pathSourceLocation);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
index 8f8c63a..1132b6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -68,4 +68,9 @@
delegate = null;
}
}
+
+ @Override
+ public OutputStream getOutputStream() {
+ return delegate;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
index 6778532..e3d0a66 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
@@ -19,23 +19,21 @@
package org.apache.asterix.external.writer.printer;
import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
-public class TextualExternalFilePrinterFactory implements IExternalFilePrinterFactory {
- private static final long serialVersionUID = 9155959967258587588L;
- private final IPrinterFactory printerFactory;
+public class TextualExternalFilePrinterFactory extends TextualExternalPrinterFactory {
+ private static final long serialVersionUID = 8971234908711234L;
private final IExternalFileCompressStreamFactory compressStreamFactory;
public TextualExternalFilePrinterFactory(IPrinterFactory printerFactory,
IExternalFileCompressStreamFactory compressStreamFactory) {
- this.printerFactory = printerFactory;
+ super(printerFactory);
this.compressStreamFactory = compressStreamFactory;
}
@Override
- public IExternalFilePrinter createPrinter() {
+ public IExternalPrinter createPrinter() {
return new TextualExternalFilePrinter(printerFactory.createPrinter(), compressStreamFactory);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
new file mode 100644
index 0000000..8c94fed
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class TextualExternalPrinter implements IExternalPrinter {
+ private final IPrinter printer;
+ private TextualOutputStreamDelegate delegate;
+ private PrintStream printStream;
+
+ TextualExternalPrinter(IPrinter printer) {
+ this.printer = printer;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.init();
+ delegate = new TextualOutputStreamDelegate(new ByteArrayOutputStream());
+ printStream = new PrintStream(delegate);
+ }
+
+ @Override
+ public void print(IValueReference value) throws HyracksDataException {
+ printer.print(value.getByteArray(), value.getStartOffset(), value.getLength(), printStream);
+ delegate.checkError();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (printStream != null) {
+ printStream.close();
+ printStream = null;
+ delegate.checkError();
+ delegate = null;
+ }
+ }
+
+ public OutputStream getOutputStream() {
+ return delegate;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
new file mode 100644
index 0000000..d779793c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalPrinterFactory implements IExternalPrinterFactory {
+ private static final long serialVersionUID = 9155959967258587588L;
+ protected final IPrinterFactory printerFactory;
+
+ public TextualExternalPrinterFactory(IPrinterFactory printerFactory) {
+ this.printerFactory = printerFactory;
+ }
+
+ @Override
+ public IExternalPrinter createPrinter() {
+ return new TextualExternalPrinter(printerFactory.createPrinter());
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
index 2ec2661..6d27985 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
@@ -96,4 +96,9 @@
throw HyracksDataException.create(exception);
}
}
+
+ @Override
+ public String toString() {
+ return stream.toString();
+ }
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
index 2520755..4cd8276 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
@@ -89,7 +89,7 @@
this.orderByModifiers = orderByModifiers;
this.orderByNullModifierList = orderByNullModifierList;
this.varCounter = varCounter;
- this.keyExpressions = keyExpressions;
+ this.keyExpressions = keyExpressions != null ? keyExpressions : new ArrayList<>();
this.autogenerated = autogenerated;
if (pathExpressions.isEmpty()) {
@@ -235,11 +235,7 @@
return autogenerated;
}
- public boolean isSinkFileStore() {
+ public boolean isFileStoreSink() {
return keyExpressions.isEmpty() && !autogenerated;
}
-
- public boolean isSinkDatabaseWithKey() {
- return !keyExpressions.isEmpty() || autogenerated;
- }
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 52e2678..6151b02 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -573,12 +573,10 @@
cto.getSourceVariable().accept(this, step);
out.println();
- if (cto.isSinkFileStore()) {
+ if (cto.isFileStoreSink()) {
formatPrintCopyToFileStore(cto, step);
- } else if (cto.isSinkDatabaseWithKey()) {
- formatPrintCopyToDatabaseWithKey(cto, step);
} else {
- throw new IllegalStateException("NYI: This should never happen");
+ formatPrintCopyToDatabaseWithKey(cto, step);
}
out.println("with ");
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8d8ca80..4ded4d4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -106,9 +106,9 @@
import org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
-import org.apache.asterix.runtime.writer.ExternalWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -139,7 +139,7 @@
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.operators.writer.SinkExternalWriterRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.writer.SinkExternalFileWriterRuntimeFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -178,7 +178,7 @@
public class MetadataProvider implements IMetadataProvider<DataSourceId, String> {
- private final ICcApplicationContext appCtx;
+ protected final ICcApplicationContext appCtx;
private final IStorageComponentProvider storageComponentProvider;
private final StorageProperties storageProperties;
private final IFunctionManager functionManager;
@@ -761,15 +761,22 @@
fileWriterFactory.validate();
String fileExtension = ExternalWriterProvider.getFileExtension(sink);
int maxResult = ExternalWriterProvider.getMaxResult(sink);
- IExternalFilePrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType);
- ExternalWriterFactory writerFactory = new ExternalWriterFactory(fileWriterFactory, printerFactory,
+ IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType);
+ ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory,
fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation);
- SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
- partitionComparatorFactories, inputDesc, writerFactory);
+ SinkExternalFileWriterRuntimeFactory runtime = new SinkExternalFileWriterRuntimeFactory(sourceColumn,
+ partitionColumns, partitionComparatorFactories, inputDesc, writerFactory);
return new Pair<>(runtime, null);
}
@Override
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteDatabaseWithKeyRuntime(int sourceColumn,
+ int[] keyColumns, IBinaryComparatorFactory[] keyComparatorFactories, boolean autogenerated,
+ IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType) throws AlgebricksException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9142556..3338e66 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -32,8 +32,8 @@
import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -65,7 +65,8 @@
throw new UnsupportedOperationException("Unsupported adapter " + adapterName);
}
- return creator.create(createConfiguration(appCtx, sink, staticPath, pathExpressionLocation));
+ return (IExternalFileWriterFactory) creator
+ .create(createConfiguration(appCtx, sink, staticPath, pathExpressionLocation));
}
public static String getFileExtension(IWriteDataSink sink) {
@@ -105,7 +106,7 @@
CREATOR_MAP.put(adapterName.toLowerCase(), creator);
}
- public static IExternalFilePrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) {
+ public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) {
Map<String, String> configuration = sink.getConfiguration();
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index 7105efa..c6d1dbe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -54,7 +54,7 @@
@Override
public String getPartitionDirectory(IFrameTupleReference tuple) throws HyracksDataException {
if (!appendPrefix(tuple)) {
- return ExternalWriter.UNRESOLVABLE_PATH;
+ return ExternalFileWriter.UNRESOLVABLE_PATH;
}
if (dirStringBuilder.length() > 0 && dirStringBuilder.charAt(dirStringBuilder.length() - 1) != fileSeparator) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
new file mode 100644
index 0000000..2fe3ba7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalFileWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+final class ExternalFileWriter implements IHyracksExternalFileWriter {
+ static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
+ private final IPathResolver pathResolver;
+ private final IExternalFileWriter writer;
+ private final int maxResultPerFile;
+ private String partitionPath;
+ private int tupleCounter;
+
+ public ExternalFileWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
+ this.pathResolver = pathResolver;
+ this.writer = writer;
+ this.maxResultPerFile = maxResultPerFile;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
+ partitionPath = pathResolver.getPartitionDirectory(tuple);
+ if (UNRESOLVABLE_PATH != partitionPath) {
+ writer.validate(partitionPath);
+ newFile();
+ }
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ if (UNRESOLVABLE_PATH == partitionPath) {
+ // Ignore writing values for unresolvable partition paths
+ return;
+ }
+ writer.write(value);
+ tupleCounter++;
+ if (tupleCounter >= maxResultPerFile) {
+ newFile();
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ writer.abort();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ private void newFile() throws HyracksDataException {
+ tupleCounter = 0;
+ if (!writer.newFile(partitionPath, pathResolver.getNextFileName())) {
+ // the partitionPath could contain illegal chars or the length of the total path is too long
+ partitionPath = UNRESOLVABLE_PATH;
+ }
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
index b62a07a..0f9974b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
@@ -22,24 +22,19 @@
import org.apache.hyracks.api.exceptions.SourceLocation;
-public final class ExternalFileWriterConfiguration {
- private final Map<String, String> configuration;
+public final class ExternalFileWriterConfiguration extends ExternalWriterConfiguration {
private final SourceLocation pathSourceLocation;
private final String staticPath;
private final boolean singleNodeCluster;
public ExternalFileWriterConfiguration(Map<String, String> configuration, SourceLocation pathSourceLocation,
String staticPath, boolean singleNodeCluster) {
- this.configuration = configuration;
+ super(configuration);
this.pathSourceLocation = pathSourceLocation;
this.staticPath = staticPath;
this.singleNodeCluster = singleNodeCluster;
}
- public Map<String, String> getConfiguration() {
- return configuration;
- }
-
public SourceLocation getPathSourceLocation() {
return pathSourceLocation;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
new file mode 100644
index 0000000..038127d
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class ExternalFileWriterFactory implements IHyracksExternalWriterFactory {
+ private static final long serialVersionUID = 1412969574113419638L;
+ private final IExternalFileWriterFactory writerFactory;
+ private final IExternalPrinterFactory printerFactory;
+ private final String fileExtension;
+ private final int maxResult;
+ private final IScalarEvaluatorFactory pathEvalFactory;
+ private final String staticPath;
+ private final SourceLocation pathSourceLocation;
+
+ public ExternalFileWriterFactory(IExternalFileWriterFactory writerFactory, IExternalPrinterFactory printerFactory,
+ String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
+ SourceLocation pathSourceLocation) {
+ this.writerFactory = writerFactory;
+ this.printerFactory = printerFactory;
+ this.fileExtension = fileExtension;
+ this.maxResult = maxResult;
+ this.pathEvalFactory = pathEvalFactory;
+ this.staticPath = staticPath;
+ this.pathSourceLocation = pathSourceLocation;
+ }
+
+ @Override
+ public IHyracksExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
+ int partition = context.getTaskAttemptId().getTaskId().getPartition();
+ char fileSeparator = writerFactory.getSeparator();
+ IPathResolver resolver;
+ if (staticPath == null) {
+ EvaluatorContext evaluatorContext = new EvaluatorContext(context);
+ IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
+ IWarningCollector warningCollector = context.getWarningCollector();
+ resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
+ pathSourceLocation);
+ } else {
+ resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
+ }
+ IExternalFileWriter writer = (IExternalFileWriter) writerFactory.createWriter(context, printerFactory);
+ return new ExternalFileWriter(resolver, writer, maxResult);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
index 5fc07af..7a06afa 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
@@ -18,23 +18,15 @@
*/
package org.apache.asterix.runtime.writer;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-final class ExternalWriter implements IExternalWriter {
- static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
- private final IPathResolver pathResolver;
- private final IExternalFileWriter writer;
- private final int maxResultPerFile;
- private String partitionPath;
- private int tupleCounter;
+final class ExternalWriter implements IHyracksExternalWriter {
+ private final IExternalWriter writer;
- public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
- this.pathResolver = pathResolver;
+ public ExternalWriter(IExternalWriter writer) {
this.writer = writer;
- this.maxResultPerFile = maxResultPerFile;
}
@Override
@@ -43,25 +35,8 @@
}
@Override
- public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
- partitionPath = pathResolver.getPartitionDirectory(tuple);
- if (UNRESOLVABLE_PATH != partitionPath) {
- writer.validate(partitionPath);
- newFile();
- }
- }
-
- @Override
public void write(IValueReference value) throws HyracksDataException {
- if (UNRESOLVABLE_PATH == partitionPath) {
- // Ignore writing values for unresolvable partition paths
- return;
- }
writer.write(value);
- tupleCounter++;
- if (tupleCounter >= maxResultPerFile) {
- newFile();
- }
}
@Override
@@ -73,12 +48,4 @@
public void close() throws HyracksDataException {
writer.close();
}
-
- private void newFile() throws HyracksDataException {
- tupleCounter = 0;
- if (!writer.newFile(partitionPath, pathResolver.getNextFileName())) {
- // the partitionPath could contain illegal chars or the length of the total path is too long
- partitionPath = UNRESOLVABLE_PATH;
- }
- }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
similarity index 71%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
index a4fa97b..abaaf68 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
@@ -18,14 +18,16 @@
*/
package org.apache.asterix.runtime.writer;
-import java.io.Serializable;
+import java.util.Map;
-/**
- * {@link IExternalFileWriter} printer factory
- */
-public interface IExternalFilePrinterFactory extends Serializable {
- /**
- * @return a new external file printer
- */
- IExternalFilePrinter createPrinter();
+public class ExternalWriterConfiguration {
+ private final Map<String, String> configuration;
+
+ public ExternalWriterConfiguration(Map<String, String> configuration) {
+ this.configuration = configuration;
+ }
+
+ public Map<String, String> getConfiguration() {
+ return configuration;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
index e7c0db0..2d1311f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
@@ -18,53 +18,24 @@
*/
package org.apache.asterix.runtime.writer;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-public class ExternalWriterFactory implements IExternalWriterFactory {
- private static final long serialVersionUID = 1412969574113419638L;
- private final IExternalFileWriterFactory writerFactory;
- private final IExternalFilePrinterFactory printerFactory;
- private final String fileExtension;
- private final int maxResult;
- private final IScalarEvaluatorFactory pathEvalFactory;
- private final String staticPath;
- private final SourceLocation pathSourceLocation;
+public class ExternalWriterFactory implements IHyracksExternalWriterFactory {
+ private static final long serialVersionUID = 1412969574113419639L;
+ private final IExternalWriterFactory writerFactory;
+ private final IExternalPrinterFactory printerFactory;
- public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, IExternalFilePrinterFactory printerFactory,
- String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
- SourceLocation pathSourceLocation) {
+ public ExternalWriterFactory(IExternalWriterFactory writerFactory, IExternalPrinterFactory printerFactory) {
this.writerFactory = writerFactory;
this.printerFactory = printerFactory;
- this.fileExtension = fileExtension;
- this.maxResult = maxResult;
- this.pathEvalFactory = pathEvalFactory;
- this.staticPath = staticPath;
- this.pathSourceLocation = pathSourceLocation;
}
@Override
- public IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
- int partition = context.getTaskAttemptId().getTaskId().getPartition();
- char fileSeparator = writerFactory.getSeparator();
- IPathResolver resolver;
- if (staticPath == null) {
- EvaluatorContext evaluatorContext = new EvaluatorContext(context);
- IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
- IWarningCollector warningCollector = context.getWarningCollector();
- resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
- pathSourceLocation);
- } else {
- resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
- }
- IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
- return new ExternalWriter(resolver, writer, maxResult);
+ public IHyracksExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
+ IExternalWriter writer = writerFactory.createWriter(context, printerFactory);
+ return new ExternalWriter(writer);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
index 7a863f7..383f300 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
@@ -18,8 +18,7 @@
*/
package org.apache.asterix.runtime.writer;
-public interface IExternalFileFilterWriterFactoryProvider {
- IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration);
+public interface IExternalFileFilterWriterFactoryProvider extends IExternalFilterWriterFactoryProvider {
char getSeparator();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
index ba5fa1d..8db70ae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
@@ -21,17 +21,11 @@
import java.io.OutputStream;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
/**
- * An {@link IExternalFileWriter} printer
+ * An {@link IExternalWriter} printer
*/
-public interface IExternalFilePrinter {
-
- /**
- * Open the printer
- */
- void open() throws HyracksDataException;
+public interface IExternalFilePrinter extends IExternalPrinter {
/**
* Initialize the printer with a new stream
@@ -39,16 +33,4 @@
* @param outputStream to print to
*/
void newStream(OutputStream outputStream) throws HyracksDataException;
-
- /**
- * Print the provided value
- *
- * @param value to print
- */
- void print(IValueReference value) throws HyracksDataException;
-
- /**
- * Flush and close the printer
- */
- void close() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
index f8ae81b..197ac86 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -19,17 +19,11 @@
package org.apache.asterix.runtime.writer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
/**
* A file writer
*/
-public interface IExternalFileWriter {
-
- /**
- * Open the writer
- */
- void open() throws HyracksDataException;
+public interface IExternalFileWriter extends IExternalWriter {
/**
* Validate the writing directory
@@ -46,21 +40,4 @@
* @return true if a new file can be created, false otherwise
*/
boolean newFile(String directory, String fileName) throws HyracksDataException;
-
- /**
- * Writer the provided value
- *
- * @param value to write
- */
- void write(IValueReference value) throws HyracksDataException;
-
- /**
- * Run the abort sequence in case of a failure
- */
- void abort() throws HyracksDataException;
-
- /**
- * Flush the final result and close the writer
- */
- void close() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
index d8f1f84..9f42e94 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -18,34 +18,10 @@
*/
package org.apache.asterix.runtime.writer;
-import java.io.Serializable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * An interface for writing to a storage device
- * Implementer should also provide a singleton to {@link IExternalFileFilterWriterFactoryProvider}
- */
-public interface IExternalFileWriterFactory extends Serializable {
- /**
- * Create a writer
- *
- * @param context task context
- * @param printerFactory printer factory for writing the final result
- * @return a new file writer
- */
- IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
- throws HyracksDataException;
+public interface IExternalFileWriterFactory extends IExternalWriterFactory {
/**
* @return file (or path) separator
*/
char getSeparator();
-
- /**
- * Validate the writer by running a test write routine to ensure the writer has the appropriate permissions
- */
- void validate() throws AlgebricksException;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilterWriterFactoryProvider.java
similarity index 76%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilterWriterFactoryProvider.java
index a4fa97b..ca64544 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilterWriterFactoryProvider.java
@@ -18,14 +18,6 @@
*/
package org.apache.asterix.runtime.writer;
-import java.io.Serializable;
-
-/**
- * {@link IExternalFileWriter} printer factory
- */
-public interface IExternalFilePrinterFactory extends Serializable {
- /**
- * @return a new external file printer
- */
- IExternalFilePrinter createPrinter();
+public interface IExternalFilterWriterFactoryProvider {
+ IExternalWriterFactory create(ExternalWriterConfiguration configuration);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
new file mode 100644
index 0000000..7cd8597
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.OutputStream;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * An {@link IExternalWriter} printer
+ */
+public interface IExternalPrinter {
+
+ /**
+ * Open the printer
+ */
+ void open() throws HyracksDataException;
+
+ /**
+ * Print the provided value
+ *
+ * @param value to print
+ */
+ void print(IValueReference value) throws HyracksDataException;
+
+ /**
+ * Flush and close the printer
+ */
+ void close() throws HyracksDataException;
+
+ /**
+ * @return return the output stream
+ */
+ OutputStream getOutputStream();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
similarity index 86%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
index a4fa97b..4d9352a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
@@ -23,9 +23,9 @@
/**
* {@link IExternalFileWriter} printer factory
*/
-public interface IExternalFilePrinterFactory extends Serializable {
+public interface IExternalPrinterFactory extends Serializable {
/**
- * @return a new external file printer
+ * @return a new external printer
*/
- IExternalFilePrinter createPrinter();
+ IExternalPrinter createPrinter();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriter.java
similarity index 74%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriter.java
index 81ed880..a98de02 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriter.java
@@ -16,32 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.algebricks.runtime.writers;
+package org.apache.asterix.runtime.writer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
- * An external writer of a query (or dataset) result
+ * An external writer
*/
public interface IExternalWriter {
+
/**
* Open the writer
*/
void open() throws HyracksDataException;
/**
- * Initialize the writer for a new partition
+ * Writer the provided value
*
- * @param tuple which contains the partitioning columns
- */
- void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException;
-
- /**
- * Write the provided value
- *
- * @param value to be written
+ * @param value to write
*/
void write(IValueReference value) throws HyracksDataException;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactory.java
new file mode 100644
index 0000000..bf09503
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface for writing to a storage device
+ * Implementer should also provide a singleton to {@link IExternalFileFilterWriterFactoryProvider}
+ */
+public interface IExternalWriterFactory extends Serializable {
+ /**
+ * Create a writer
+ *
+ * @param context task context
+ * @param printerFactory printer factory for writing the final result
+ * @return a new file writer
+ */
+ IExternalWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
+ throws HyracksDataException;
+
+ /**
+ * Perform the necessary validation to ensure the writer has the proper permissions
+ */
+ void validate() throws AlgebricksException;
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 2072dee..afacb59 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -65,6 +65,10 @@
SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
throws AlgebricksException;
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteDatabaseWithKeyRuntime(int sourceColumn,
+ int[] keyColumns, IBinaryComparatorFactory[] keyComparatorFactories, boolean autogenerated,
+ IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType) throws AlgebricksException;
+
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, int[] printColumns,
IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
index 7eef90e..6ee509e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
@@ -41,17 +41,22 @@
private final Mutable<ILogicalExpression> pathExpression;
private final List<Mutable<ILogicalExpression>> partitionExpressions;
private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+ private final List<Mutable<ILogicalExpression>> keyExpressions;
+ private final Mutable<Boolean> autogenerated;
private final IWriteDataSink writeDataSink;
public WriteOperator(Mutable<ILogicalExpression> sourceExpression, Mutable<ILogicalExpression> pathExpression,
List<Mutable<ILogicalExpression>> partitionExpressions,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
+ List<Mutable<ILogicalExpression>> keyExpressions, Mutable<Boolean> autogenerated,
IWriteDataSink writeDataSink) {
this.sourceExpression = sourceExpression;
this.pathExpression = pathExpression;
this.partitionExpressions = partitionExpressions;
this.orderExpressions = orderExpressions;
this.writeDataSink = writeDataSink;
+ this.keyExpressions = keyExpressions;
+ this.autogenerated = autogenerated;
}
public Mutable<ILogicalExpression> getSourceExpression() {
@@ -74,6 +79,18 @@
return orderExpressions;
}
+ public List<Mutable<ILogicalExpression>> getKeyExpressions() {
+ return keyExpressions;
+ }
+
+ public List<LogicalVariable> getKeyVariables() {
+ List<LogicalVariable> keyVariables = new ArrayList<>();
+ for (Mutable<ILogicalExpression> keyExpression : keyExpressions) {
+ keyVariables.add(VariableUtilities.getVariable(keyExpression.getValue()));
+ }
+ return keyVariables;
+ }
+
public List<LogicalVariable> getPartitionVariables() {
List<LogicalVariable> partitionVariables = new ArrayList<>();
for (Mutable<ILogicalExpression> partitionExpression : partitionExpressions) {
@@ -92,10 +109,18 @@
return orderColumns;
}
+ public boolean getAutogenerated() {
+ return autogenerated.getValue();
+ }
+
public IWriteDataSink getWriteDataSink() {
return writeDataSink;
}
+ public boolean isFileStoreSink() {
+ return keyExpressions.isEmpty() && !autogenerated.getValue();
+ }
+
@Override
public LogicalOperatorTag getOperatorTag() {
return LogicalOperatorTag.WRITE;
@@ -119,6 +144,10 @@
changed |= visitor.transform(orderExpressionPair.second);
}
+ for (Mutable<ILogicalExpression> expression : keyExpressions) {
+ changed |= visitor.transform(expression);
+ }
+
return changed;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index fa47ae5..a240b50 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -297,9 +297,12 @@
deepCopyExpressionRefs(new ArrayList<>(), op.getPartitionExpressions());
List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderPairExpressions =
deepCopyOrderAndExpression(op.getOrderExpressions());
+ List<Mutable<ILogicalExpression>> newKeyPairExpressions =
+ deepCopyExpressionRefs(new ArrayList<>(), op.getKeyExpressions());
IWriteDataSink writeDataSink = op.getWriteDataSink().createCopy();
return new WriteOperator(newSourceExpression, newPathExpression, newPartitionExpressions,
- newOrderPairExpressions, writeDataSink);
+ newOrderPairExpressions, newKeyPairExpressions, new MutableObject<>(op.getAutogenerated()),
+ writeDataSink);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index d462cd5..6c87fdb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -61,12 +61,16 @@
private final LogicalVariable sourceVariable;
private final List<LogicalVariable> partitionVariables;
private final List<OrderColumn> orderColumns;
+ private final List<LogicalVariable> keyVariables;
+ private final boolean autogenerated;
public SinkWritePOperator(LogicalVariable sourceVariable, List<LogicalVariable> partitionVariables,
- List<OrderColumn> orderColumns) {
+ List<OrderColumn> orderColumns, List<LogicalVariable> keyVariables, boolean autogenerated) {
this.sourceVariable = sourceVariable;
this.partitionVariables = partitionVariables;
this.orderColumns = orderColumns;
+ this.keyVariables = keyVariables;
+ this.autogenerated = autogenerated;
}
@Override
@@ -145,6 +149,11 @@
IBinaryComparatorFactory[] partitionComparatorFactories =
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionVariables, typeEnv, context);
+ // Key columns
+ int[] keyColumns = JobGenHelper.projectVariables(schema, keyVariables);
+ IBinaryComparatorFactory[] keyComparatorFactories =
+ JobGenHelper.variablesToAscBinaryComparatorFactories(keyVariables, typeEnv, context);
+
RecordDescriptor recDesc =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
@@ -152,9 +161,17 @@
IMetadataProvider<?, ?> mp = context.getMetadataProvider();
- Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getWriteFileRuntime(
- sourceColumn, partitionColumns, partitionComparatorFactories, dynamicPathEvalFactory, staticPathExpr,
- pathExpr.getSourceLocation(), writeDataSink, inputDesc, typeEnv.getVarType(sourceVariable));
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints;
+ if (write.isFileStoreSink()) {
+ runtimeAndConstraints = mp.getWriteFileRuntime(sourceColumn, partitionColumns, partitionComparatorFactories,
+ dynamicPathEvalFactory, staticPathExpr, pathExpr.getSourceLocation(), writeDataSink, inputDesc,
+ typeEnv.getVarType(sourceVariable));
+
+ } else {
+ runtimeAndConstraints = mp.getWriteDatabaseWithKeyRuntime(sourceColumn, keyColumns, keyComparatorFactories,
+ autogenerated, writeDataSink, inputDesc, typeEnv.getVarType(sourceVariable));
+ }
+
IPushRuntimeFactory runtime = runtimeAndConstraints.first;
runtime.setSourceLocation(write.getSourceLocation());
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f945994..b440305 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -408,7 +408,8 @@
}
ensureAllVariables(op.getPartitionExpressions(), v -> v);
ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
- return new SinkWritePOperator(op.getSourceVariable(), op.getPartitionVariables(), op.getOrderColumns());
+ return new SinkWritePOperator(op.getSourceVariable(), op.getPartitionVariables(), op.getOrderColumns(),
+ op.getKeyVariables(), op.getAutogenerated());
}
@Override
@@ -648,4 +649,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntime.java
similarity index 62%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
copy to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntime.java
index 01e137b..7ffc65d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntime.java
@@ -21,46 +21,40 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
-final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+final class SinkExternalDatabaseWithKeyWriterRuntime extends AbstractOneInputSinkPushRuntime {
private final int sourceColumn;
- private final int[] partitionColumns;
private final IPointable sourceValue;
- private final PointableTupleReference partitionColumnsPrevCopy;
- private final PermutingFrameTupleReference partitionColumnsRef;
- private final IBinaryComparator[] partitionComparators;
- private final IExternalWriter writer;
+ private final int[] keyColumns;
+ private final IPointable[] keyValues;
+ private final IBinaryComparatorFactory[] keyComparatorFactories;
+ private final boolean autogenerated;
+ private final IHyracksExternalWriter writer;
private FrameTupleAccessor tupleAccessor;
private FrameTupleReference tupleRef;
- private boolean first;
private IFrameWriter frameWriter;
- SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
- RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+ SinkExternalDatabaseWithKeyWriterRuntime(int sourceColumn, int[] keyColumns,
+ IBinaryComparatorFactory[] keyComparatorFactories, boolean autogenerated, RecordDescriptor inputRecordDesc,
+ IHyracksExternalWriter writer) {
this.sourceColumn = sourceColumn;
- this.partitionColumns = partitionColumns;
this.sourceValue = new VoidPointable();
- partitionColumnsRef = new PermutingFrameTupleReference(partitionColumns);
- partitionColumnsPrevCopy =
- PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
- this.partitionComparators = partitionComparators;
+ this.keyColumns = keyColumns;
+ this.keyValues = new VoidPointable[keyColumns.length];
+ this.keyComparatorFactories = keyComparatorFactories;
+ this.autogenerated = autogenerated;
this.inputRecordDesc = inputRecordDesc;
this.writer = writer;
- first = true;
}
@Override
@@ -78,13 +72,8 @@
tupleAccessor.reset(buffer);
for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
tupleRef.reset(tupleAccessor, i);
- if (isNewPartition(i)) {
- writer.initNewPartition(tupleRef);
- }
setValue(tupleRef, sourceColumn, sourceValue);
writer.write(sourceValue);
- partitionColumnsRef.reset(tupleAccessor, i);
- partitionColumnsPrevCopy.set(partitionColumnsRef);
}
}
@@ -105,16 +94,6 @@
this.frameWriter = frameWriter;
}
- private boolean isNewPartition(int index) throws HyracksDataException {
- if (first) {
- first = false;
- return true;
- }
-
- return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor, index, partitionColumns,
- partitionComparators);
- }
-
private void setValue(IFrameTupleReference tuple, int column, IPointable value) {
byte[] data = tuple.getFieldData(column);
int start = tuple.getFieldStart(column);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntimeFactory.java
new file mode 100644
index 0000000..fede2ec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalDatabaseWithKeyWriterRuntimeFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class SinkExternalDatabaseWithKeyWriterRuntimeFactory extends AbstractPushRuntimeFactory {
+ private static final long serialVersionUID = -2215789207336628581L;
+ private final int sourceColumn;
+ private final int[] keyColumns;
+ private final IBinaryComparatorFactory[] keyComparatorFactories;
+ private final boolean autogenerated;
+ private final RecordDescriptor inputRecordDescriptor;
+ private final IHyracksExternalWriterFactory writerFactory;
+
+ public SinkExternalDatabaseWithKeyWriterRuntimeFactory(int sourceColumn, int[] keyColumns,
+ IBinaryComparatorFactory[] keyComparatorFactories, boolean autogenerated,
+ RecordDescriptor inputRecordDescriptor, IHyracksExternalWriterFactory writerFactory) {
+ this.sourceColumn = sourceColumn;
+ this.keyColumns = keyColumns;
+ this.keyComparatorFactories = keyComparatorFactories;
+ this.autogenerated = autogenerated;
+ this.inputRecordDescriptor = inputRecordDescriptor;
+ this.writerFactory = writerFactory;
+ }
+
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ IHyracksExternalWriter writer = writerFactory.createWriter(ctx);
+ SinkExternalDatabaseWithKeyWriterRuntime runtime = new SinkExternalDatabaseWithKeyWriterRuntime(sourceColumn,
+ keyColumns, keyComparatorFactories, autogenerated, inputRecordDescriptor, writer);
+ return new IPushRuntime[] { runtime };
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntime.java
similarity index 89%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntime.java
index 01e137b..25c3bc0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntime.java
@@ -21,7 +21,8 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalFileWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -36,21 +37,21 @@
import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
-final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+final class SinkExternalFileWriterRuntime extends AbstractOneInputSinkPushRuntime {
private final int sourceColumn;
private final int[] partitionColumns;
private final IPointable sourceValue;
private final PointableTupleReference partitionColumnsPrevCopy;
private final PermutingFrameTupleReference partitionColumnsRef;
private final IBinaryComparator[] partitionComparators;
- private final IExternalWriter writer;
+ private final IHyracksExternalFileWriter writer;
private FrameTupleAccessor tupleAccessor;
private FrameTupleReference tupleRef;
private boolean first;
private IFrameWriter frameWriter;
- SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
- RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+ SinkExternalFileWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
+ RecordDescriptor inputRecordDesc, IHyracksExternalWriter writer) {
this.sourceColumn = sourceColumn;
this.partitionColumns = partitionColumns;
this.sourceValue = new VoidPointable();
@@ -59,7 +60,7 @@
PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
this.partitionComparators = partitionComparators;
this.inputRecordDesc = inputRecordDesc;
- this.writer = writer;
+ this.writer = (IHyracksExternalFileWriter) writer;
first = true;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntimeFactory.java
similarity index 79%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntimeFactory.java
index 6220dec..43bffe5 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalFileWriterRuntimeFactory.java
@@ -20,25 +20,25 @@
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
-import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IHyracksExternalWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public final class SinkExternalWriterRuntimeFactory extends AbstractPushRuntimeFactory {
+public final class SinkExternalFileWriterRuntimeFactory extends AbstractPushRuntimeFactory {
private static final long serialVersionUID = -2215789207336628581L;
private final int sourceColumn;
private final int[] partitionColumn;
private final IBinaryComparatorFactory[] partitionComparatorFactories;
private final RecordDescriptor inputRecordDescriptor;
- private final IExternalWriterFactory writerFactory;
+ private final IHyracksExternalWriterFactory writerFactory;
- public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumn,
+ public SinkExternalFileWriterRuntimeFactory(int sourceColumn, int[] partitionColumn,
IBinaryComparatorFactory[] partitionComparatorFactories, RecordDescriptor inputRecordDescriptor,
- IExternalWriterFactory writerFactory) {
+ IHyracksExternalWriterFactory writerFactory) {
this.sourceColumn = sourceColumn;
this.partitionColumn = partitionColumn;
this.partitionComparatorFactories = partitionComparatorFactories;
@@ -48,12 +48,12 @@
@Override
public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
- IExternalWriter writer = writerFactory.createWriter(ctx);
+ IHyracksExternalWriter writer = writerFactory.createWriter(ctx);
IBinaryComparator[] partitionComparators = new IBinaryComparator[partitionComparatorFactories.length];
for (int i = 0; i < partitionComparatorFactories.length; i++) {
partitionComparators[i] = partitionComparatorFactories[i].createBinaryComparator();
}
- SinkExternalWriterRuntime runtime = new SinkExternalWriterRuntime(sourceColumn, partitionColumn,
+ SinkExternalFileWriterRuntime runtime = new SinkExternalFileWriterRuntime(sourceColumn, partitionColumn,
partitionComparators, inputRecordDescriptor, writer);
return new IPushRuntime[] { runtime };
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalFileWriter.java
similarity index 67%
copy from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
copy to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalFileWriter.java
index e6899a6..3989394 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalFileWriter.java
@@ -18,21 +18,15 @@
*/
package org.apache.hyracks.algebricks.runtime.writers;
-import java.io.Serializable;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-/**
- * A writer factory which creates a writer for the result of a query (or dataset) to a storage device
- */
-@FunctionalInterface
-public interface IExternalWriterFactory extends Serializable {
+public interface IHyracksExternalFileWriter extends IHyracksExternalWriter {
+
/**
- * Crete a new writer
+ * Initialize the writer for a new partition
*
- * @param context task context
- * @return new writer
+ * @param tuple which contains the partitioning columns
*/
- IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException;
+ void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriter.java
similarity index 82%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriter.java
index 81ed880..00826e4 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriter.java
@@ -20,25 +20,17 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
* An external writer of a query (or dataset) result
*/
-public interface IExternalWriter {
+public interface IHyracksExternalWriter {
/**
* Open the writer
*/
void open() throws HyracksDataException;
/**
- * Initialize the writer for a new partition
- *
- * @param tuple which contains the partitioning columns
- */
- void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException;
-
- /**
* Write the provided value
*
* @param value to be written
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriterFactory.java
similarity index 88%
rename from hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
rename to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriterFactory.java
index e6899a6..547fa9b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IHyracksExternalWriterFactory.java
@@ -27,12 +27,12 @@
* A writer factory which creates a writer for the result of a query (or dataset) to a storage device
*/
@FunctionalInterface
-public interface IExternalWriterFactory extends Serializable {
+public interface IHyracksExternalWriterFactory extends Serializable {
/**
* Crete a new writer
*
* @param context task context
* @return new writer
*/
- IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException;
+ IHyracksExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18169
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Icea1ff9f32fe49ee83d0739f5c5a305c3345faa7
Gerrit-Change-Number: 18169
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <hu...@gmail.com>
Gerrit-MessageType: newchange