You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "adarshsanjeev (via GitHub)" <gi...@apache.org> on 2024/01/16 05:47:05 UTC

[PR] Add export capabilities to MSQ with SQL syntax (druid)

adarshsanjeev opened a new pull request, #15689:
URL: https://github.com/apache/druid/pull/15689

   Druid currently does not allow export of tables in a programmatic manner. While is is possible to download results from a SELECT query, this relies on writing the results to a single query report, which cannot support large datasets. An export syntax which writes the results in a desired format directly to an external location (such as s3 or hdfs) would be useful. The scope could be explanded later to also have a directory structure that already partitions the data.
   
   
   This PR adds export capabilities to insert/replace statements by using EXTERN as a destination:
   ```
   (INSERT/REPLACE) INTO
   EXTERN(<external source function>)
   AS <format>[(key1=value1,key2=value2)]
   [OVERWRITE ALL]
   <select query>
   ```
   
   For example: A statement to export all rows from a table into S3 as CSV files would look like 
   ```
   REPLACE
   INTO EXTERN(s3(uri="s3://outputdirectory/", username=...))
   AS CSV
   OVERWRITE ALL
   SELECT * FROM wikipedia
   ```
   
   INSERT statements append to the files found at the destination, while replace deletes any files found at the root path and creates the new files.
   
   #### Release note
   <!-- Give your best effort to summarize your changes in a couple of sentences aimed toward Druid users. 
   
   If your change doesn't have end user impact, you can skip this section.
   
   For tips about how to write a good release note, see [Release notes](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#release-notes).
   
   -->
   - Adds export syntax to MSQ, as a part of INSERT and REPLACE statements.
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `sql/src/main/codegen/includes/common.ftl`
    * `sql/src/main/codegen/includes/replace.ftl`
    * `IngestHandler`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471243219


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java:
##########
@@ -106,12 +110,45 @@ protected String operationName()
   @Override
   public void validate()
   {
-    if (ingestNode().getPartitionedBy() == null) {
+    if (ingestNode().getTargetTable() instanceof ExternalDestinationSqlIdentifier) {
+      if (!handlerContext.plannerContext().featureAvailable(EngineFeature.WRITE_EXTERNAL_DATA)) {

Review Comment:
   IngestHandler is used by native queries as well, just that it throws an exception immediately. Currently, this wouldn't be triggered since the child classes of IngestHandler already check if it is an insert or replace.
   
   However, for correctness's sake, this should be present, even if a test cannot reach it. If in the future, say, a new engine is added which supports insert and replace, but not export, this would be needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471488886


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1780,6 +1788,11 @@ private static QueryDefinition makeQueryDefinition(
           MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
       );
       queryToPlan = querySpec.getQuery();
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
+          MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
+      );
+      queryToPlan = querySpec.getQuery();

Review Comment:
   Added



##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java:
##########
@@ -34,13 +34,18 @@
  */
 public abstract class DruidSqlIngest extends SqlInsert
 {
+  public static final String SQL_EXPORT_FILE_FORMAT = "__exportFileFormat";

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "317brian (via GitHub)" <gi...@apache.org>.
317brian commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1480493752


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.

Review Comment:
   ```suggestion
   When you export data, use the `rowsPerPage` context parameter to control how many rows get exported. The default is 100,000.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481443465


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3.output;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.s3.S3InputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+
+@JsonTypeName(S3ExportStorageProvider.TYPE_NAME)
+public class S3ExportStorageProvider implements ExportStorageProvider
+{
+  public static final String TYPE_NAME = S3InputSource.TYPE_KEY;
+  @JsonProperty
+  private final String bucket;
+  @JsonProperty
+  private final String prefix;
+
+  @JacksonInject
+  S3ExportConfig s3ExportConfig;
+  @JacksonInject
+  ServerSideEncryptingAmazonS3 s3;
+
+  @JsonCreator
+  public S3ExportStorageProvider(
+      @JsonProperty(value = "bucket", required = true) String bucket,
+      @JsonProperty(value = "prefix", required = true) String prefix
+  )
+  {
+    this.bucket = bucket;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public StorageConnector get()
+  {
+    final String tempDir = s3ExportConfig.getTempDir();
+    if (tempDir == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build("The runtime property `druid.export.storage.s3.tempDir` must be configured for S3 export.");
+    }
+    final List<String> allowedExportPaths = s3ExportConfig.getAllowedExportPaths();
+    if (allowedExportPaths == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build(
+                              "The runtime property `druid.export.storage.s3.allowedExportPaths` must be configured for S3 export.");
+    }
+    validateS3Prefix(allowedExportPaths, bucket, prefix);
+    final S3OutputConfig s3OutputConfig = new S3OutputConfig(
+        bucket,
+        prefix,
+        new File(tempDir),
+        s3ExportConfig.getChunkSize(),
+        s3ExportConfig.getMaxRetry()
+    );
+    return new S3StorageConnector(s3OutputConfig, s3);
+  }
+
+  @VisibleForTesting
+  static void validateS3Prefix(@NotNull final List<String> allowedExportPaths, final String bucket, final String prefix)
+  {
+    final URI providedUri = new CloudObjectLocation(bucket, prefix).toUri(S3StorageDruidModule.SCHEME);
+    for (final String path : allowedExportPaths) {
+      final URI allowedUri = URI.create(path);
+      if (validateUri(allowedUri, providedUri)) {
+        return;
+      }
+    }
+    throw DruidException.forPersona(DruidException.Persona.USER)

Review Comment:
   The error message is more likely to be due to user error in specifying the path than a permission issue, so keeping it as user makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1475764572


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1872,6 +1871,44 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         return queryDef;
       }
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
+      final StorageConnectorProvider storageConnectorProvider = exportMSQDestination.getStorageConnectorProvider();
+
+      final ResultFormat resultFormat = exportMSQDestination.getResultFormat();
+
+      // If the statement is a 'REPLACE' statement, delete the existing files at the destination.
+      if (exportMSQDestination.getReplaceTimeChunks() != null) {
+        if (Intervals.ONLY_ETERNITY.equals(exportMSQDestination.getReplaceTimeChunks())) {
+          StorageConnector storageConnector = storageConnectorProvider.get();
+          try {
+            storageConnector.deleteRecursively("");

Review Comment:
   Instead of remove the delete call, we can add a directory like `druid-export/` to the path, and delete it for REPLACE. This way, we would not delete any other files unless the user stores them in a directory with the same name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471132471


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.util.SequenceUtils;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ExportResultsFrameProcessor implements FrameProcessor<Object>
+{
+  private final ReadableFrameChannel inputChannel;
+  private final ResultFormat exportFormat;
+  private final FrameReader frameReader;
+  private final StorageConnector storageConnector;
+  private final ObjectMapper jsonMapper;
+  private final int partitionNumber;
+  private final int workerNumber;
+  private final ChannelCounters channelCounter;
+
+  public ExportResultsFrameProcessor(
+      ReadableFrameChannel inputChannel,
+      ResultFormat exportFormat,
+      FrameReader frameReader,
+      StorageConnector storageConnector,
+      ObjectMapper jsonMapper,
+      int partitionNumber,
+      int workerNumber,
+      ChannelCounters channelCounter
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.exportFormat = exportFormat;
+    this.frameReader = frameReader;
+    this.storageConnector = storageConnector;
+    this.jsonMapper = jsonMapper;
+    this.partitionNumber = partitionNumber;
+    this.workerNumber = workerNumber;
+    this.channelCounter = channelCounter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    if (readableInputs.isEmpty()) {
+      return ReturnOrAwait.awaitAll(1);
+    }
+
+    if (inputChannel.isFinished()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      exportFrame(inputChannel.read());
+      return ReturnOrAwait.awaitAll(1);
+    }
+  }
+
+  private void exportFrame(final Frame frame) throws IOException
+  {
+    final RowSignature signature = frameReader.signature();
+
+    final Sequence<Cursor> cursorSequence =
+        new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
+            .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+
+    final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat);
+    try (OutputStream stream = storageConnector.write(exportFilePath)) {
+      ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper);
+
+      SequenceUtils.forEach(
+          cursorSequence,
+          cursor -> {
+            try {
+              formatter.writeResponseStart();

Review Comment:
   Cool! Would that require parser changes? If so, I think adding a header should be default (in this PR. I am not sure of this though, so I am cool with any case. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471495318


##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")
+  public String getDestinationType()
+  {
+    return destinationType;
+  }
+
+  @JsonProperty("properties")
+  public Map<String, String> getProperties()

Review Comment:
   Changed to deserialize to StorageConnectorProvider in ExportDestination



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1480833441


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.

Review Comment:
   The `AS` clause would not be an argument to extern, it's present elsewhere in the query. Would it be confusing to call it an argument?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1461756687


##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.google.common.collect.Iterables;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.catalog.model.table.export.ExportDestination;
+
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external table destination. This contains information

Review Comment:
   Updated the comment



##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.google.common.collect.Iterables;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.catalog.model.table.export.ExportDestination;
+
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external table destination. This contains information
+ * required for a task to write to a destination.
+ */
+public class ExternalDestinationSqlIdentifier extends SqlIdentifier
+{
+  private final ExportDestination exportDestination;
+  private final Map<String, String> propertiesForUnparse;
+
+  public ExternalDestinationSqlIdentifier(
+      String name,
+      SqlParserPos pos,
+      ExportDestination exportDestination,
+      Map<String, String> propertiesForUnparse
+  )
+  {
+    super(name, pos);
+    this.exportDestination = exportDestination;
+    this.propertiesForUnparse = propertiesForUnparse;
+  }
+
+  public ExportDestination getExportDestination()
+  {
+    return exportDestination;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
+  {
+    SqlWriter.Frame externFrame = writer.startFunCall("EXTERN");
+    SqlWriter.Frame frame = writer.startFunCall(Iterables.getOnlyElement(names));
+    for (Map.Entry<String, String> property : propertiesForUnparse.entrySet()) {
+      writer.sep(",");
+      writer.keyword(property.getKey());
+      writer.print("=");
+      writer.identifier(property.getValue(), false);
+    }
+    writer.endFunCall(frame);
+    writer.endFunCall(externFrame);
+  }
+
+  @Override
+  public SqlNode clone(SqlParserPos pos)
+  {
+    return new ExternalDestinationSqlIdentifier(Iterables.getOnlyElement(names), pos, exportDestination, propertiesForUnparse);
+  }
+
+  @Override
+  @Deprecated
+  public Object clone()
+  {
+    throw new UnsupportedOperationException("Function is deprecated, please use clone(SqlNode) instead.");

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1456925619


##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.google.common.collect.Iterables;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.catalog.model.table.export.ExportDestination;
+
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external table destination. This contains information
+ * required for a task to write to a destination.
+ */
+public class ExternalDestinationSqlIdentifier extends SqlIdentifier
+{
+  private final ExportDestination exportDestination;
+  private final Map<String, String> propertiesForUnparse;
+
+  public ExternalDestinationSqlIdentifier(
+      String name,
+      SqlParserPos pos,
+      ExportDestination exportDestination,
+      Map<String, String> propertiesForUnparse
+  )
+  {
+    super(name, pos);
+    this.exportDestination = exportDestination;
+    this.propertiesForUnparse = propertiesForUnparse;
+  }
+
+  public ExportDestination getExportDestination()
+  {
+    return exportDestination;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
+  {
+    SqlWriter.Frame externFrame = writer.startFunCall("EXTERN");
+    SqlWriter.Frame frame = writer.startFunCall(Iterables.getOnlyElement(names));

Review Comment:
   We should use something other than Iterables.getOnlyElement to prevent a cryptic error message. As I understand, this case wouldn't be reached because of the SQL parser, however a `DruidException.defensive` seems more appropriate. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/MSQSelectDestination.java:
##########
@@ -30,6 +30,10 @@ public enum MSQSelectDestination
    * Writes all the results directly to the report.
    */
   TASKREPORT("taskReport", false),
+  /**
+   * Writes the results as rows to a location.

Review Comment:
   location is generic, can expand a bit further. 



##########
sql/src/main/codegen/includes/replace.ftl:
##########
@@ -20,33 +20,52 @@
 // Taken from syntax of SqlInsert statement from calcite parser, edited for replace syntax
 SqlNode DruidSqlReplaceEof() :
 {
-    SqlNode table;
+    final SqlIdentifier destination;
     SqlNode source;
     SqlNodeList columnList = null;
     final Span s;
+    SqlNode tableRef = null;
     SqlInsert sqlInsert;
     // Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
     org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
     SqlNodeList clusteredBy = null;
     final Pair<SqlNodeList, SqlNodeList> p;
     SqlNode replaceTimeQuery = null;
+    String exportFileFormat = null;
 }
 {
     <REPLACE> { s = span(); }
     <INTO>
-    table = CompoundIdentifier()
-    [
-        p = ParenthesizedCompoundIdentifierList() {
-            if (p.left.size() > 0) {
-                columnList = p.left;
-            }
+    (
+      LOOKAHEAD(2)
+      <EXTERN> <LPAREN> destination = ExternalDestination() <RPAREN>
+      |
+      destination = CompoundTableIdentifier()
+      ( tableRef = TableHints(destination) | { tableRef = destination; } )
+      [ LOOKAHEAD(5) tableRef = ExtendTable(tableRef) ]
+    )
+    (
+      LOOKAHEAD(2)
+      p = ParenthesizedCompoundIdentifierList() {
+        if (p.right.size() > 0) {
+          tableRef = extend(tableRef, p.right);
         }
+        if (p.left.size() > 0) {
+          columnList = p.left;
+        } else {
+          columnList = null;
+        }
+      }
+      | { columnList = null; }

Review Comment:
   Seems like there is some code duplication b/w INSERT and REPLACE. Can we unify some code, and only keep the high-level constructs in place? 



##########
sql/src/main/codegen/config.fmpp:
##########
@@ -65,10 +70,16 @@ data: {
       "CLUSTERED"
       "OVERWRITE"
       "PARTITIONED"
+      "EXTERN"
+      "S3"
+      "CSV"

Review Comment:
   Why make it as a keyword? If we can, let's keep it as non-reserved only.
   
   Also, it would be cool to have tests that test the parsing if any of these keywords are in the table identifier list, something like `SELECT csv, s3, col3, col4 FROM extern` (and a few permutations of these). Queries like these will most likely break if EXTERN is a keyword (because then the parser will prevent it from being an identifier). Also I am curious what happens if these queries are run now - IMO they should pass because the current EXTERN(..) function is like a sql function and not a syntax change. 
   
   Some more examples can be : OVERWRITE INTO EXTERN(....) SELECT col1 FROM extern WHERE....
   (Here first extern refer to the new syntax, while the second one refer to the table)
   
   If the parsing is correct, in these edge cases, we can probably make do with keeping it as non-reserved only. Else this change needs to be called out.



##########
server/src/main/java/org/apache/druid/catalog/model/table/export/ExportDestination.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table.export;
+
+import org.apache.druid.catalog.model.table.IngestDestination;
+
+public interface ExportDestination extends IngestDestination

Review Comment:
   Also, is this level of indirection required? Are we planning on adding some methods here?



##########
sql/src/main/codegen/includes/insert.ftl:
##########
@@ -21,68 +21,68 @@
  * Parses an INSERT statement. This function is copied from SqlInsert in core/src/main/codegen/templates/Parser.jj,
  * with some changes to allow a custom error message if an OVERWRITE clause is present.
  */
-SqlNode DruidSqlInsert() :
+// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
+SqlNode DruidSqlInsertEof() :
 {
-    final List<SqlLiteral> keywords = new ArrayList<SqlLiteral>();
-    final SqlNodeList keywordList;
-    final SqlIdentifier tableName;
-    SqlNode tableRef;
-    SqlNode source;
-    final SqlNodeList columnList;
-    final Span s;
-    final Pair<SqlNodeList, SqlNodeList> p;
+  SqlNode insertNode;
+  final List<SqlLiteral> keywords = new ArrayList<SqlLiteral>();
+  final SqlNodeList keywordList;
+  final SqlIdentifier destination;
+  SqlNode tableRef = null;
+  SqlNode source;
+  final SqlNodeList columnList;
+  final Span s;
+  final Pair<SqlNodeList, SqlNodeList> p;
+  org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
+  SqlNodeList clusteredBy = null;
+  String exportFileFormat = null;
 }
 {
-    (
-        <INSERT>
+  (
+    <INSERT>
     |
-        <UPSERT> { keywords.add(SqlInsertKeyword.UPSERT.symbol(getPos())); }
-    )
-    { s = span(); }
-    SqlInsertKeywords(keywords) {
-        keywordList = new SqlNodeList(keywords, s.addAll(keywords).pos());
-    }
-    <INTO> tableName = CompoundTableIdentifier()
-    ( tableRef = TableHints(tableName) | { tableRef = tableName; } )
+    <UPSERT> { keywords.add(SqlInsertKeyword.UPSERT.symbol(getPos())); }
+  )
+  { s = span(); }
+  SqlInsertKeywords(keywords) {
+    keywordList = new SqlNodeList(keywords, s.addAll(keywords).pos());
+  }
+  <INTO>
+  (
+    LOOKAHEAD(2)
+    <EXTERN> <LPAREN> destination = ExternalDestination() <RPAREN>
+    |
+    destination = CompoundTableIdentifier()
+    ( tableRef = TableHints(destination) | { tableRef = destination; } )
     [ LOOKAHEAD(5) tableRef = ExtendTable(tableRef) ]
-    (
-        LOOKAHEAD(2)
-        p = ParenthesizedCompoundIdentifierList() {
-            if (p.right.size() > 0) {
-                tableRef = extend(tableRef, p.right);
-            }
-            if (p.left.size() > 0) {
-                columnList = p.left;
-            } else {
-                columnList = null;
-            }
-        }
-    |   { columnList = null; }
-    )
-    (
+  )
+  (
+    LOOKAHEAD(2)

Review Comment:
   Why is this lookahead required?



##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.google.common.collect.Iterables;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.catalog.model.table.export.ExportDestination;
+
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external table destination. This contains information
+ * required for a task to write to a destination.
+ */
+public class ExternalDestinationSqlIdentifier extends SqlIdentifier
+{
+  private final ExportDestination exportDestination;
+  private final Map<String, String> propertiesForUnparse;
+
+  public ExternalDestinationSqlIdentifier(
+      String name,
+      SqlParserPos pos,
+      ExportDestination exportDestination,
+      Map<String, String> propertiesForUnparse
+  )
+  {
+    super(name, pos);
+    this.exportDestination = exportDestination;
+    this.propertiesForUnparse = propertiesForUnparse;
+  }
+
+  public ExportDestination getExportDestination()
+  {
+    return exportDestination;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
+  {
+    SqlWriter.Frame externFrame = writer.startFunCall("EXTERN");
+    SqlWriter.Frame frame = writer.startFunCall(Iterables.getOnlyElement(names));
+    for (Map.Entry<String, String> property : propertiesForUnparse.entrySet()) {
+      writer.sep(",");
+      writer.keyword(property.getKey());
+      writer.print("=");
+      writer.identifier(property.getValue(), false);
+    }
+    writer.endFunCall(frame);
+    writer.endFunCall(externFrame);
+  }
+
+  @Override
+  public SqlNode clone(SqlParserPos pos)
+  {
+    return new ExternalDestinationSqlIdentifier(Iterables.getOnlyElement(names), pos, exportDestination, propertiesForUnparse);
+  }
+
+  @Override
+  @Deprecated
+  public Object clone()
+  {
+    throw new UnsupportedOperationException("Function is deprecated, please use clone(SqlNode) instead.");

Review Comment:
   nit: DruidException.defensive, to let the users know that its not an issue on their end, if they ever encounter it. 



##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.google.common.collect.Iterables;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.catalog.model.table.export.ExportDestination;
+
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external table destination. This contains information

Review Comment:
   What's an external table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "gargvishesh (via GitHub)" <gi...@apache.org>.
gargvishesh commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1453041762


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java:
##########
@@ -153,15 +154,15 @@ public OverlordClient overlordClient()
 
   @Override
   public QueryMaker buildQueryMakerForInsert(
-      final String targetDataSource,
+      final IngestDestination targetDestination,

Review Comment:
   nit: can just call `destination` or `ingestDestination`



##########
server/src/main/java/org/apache/druid/catalog/model/table/export/ExportDestination.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table.export;
+
+import org.apache.druid.catalog.model.table.IngestDestination;
+
+public interface ExportDestination extends IngestDestination

Review Comment:
   Do present changes support local disk file export as well? 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java:
##########
@@ -153,15 +154,15 @@ public OverlordClient overlordClient()
 
   @Override
   public QueryMaker buildQueryMakerForInsert(
-      final String targetDataSource,
+      final IngestDestination targetDestination,

Review Comment:
   Same applies to a few other places as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1470849528


##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.export.TestExportStorageConnector;
+import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MSQExportTest extends MSQTestBase
+{
+  @Test
+  public void testExport() throws IOException

Review Comment:
   Added a test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1474655343


##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java:
##########
@@ -176,4 +188,83 @@
 
     msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
   }
+
+  @Test
+  public void testExport() throws Exception
+  {
+    String exportQuery =
+        StringUtils.format(
+            "REPLACE INTO extern(%s(basePath = '%s'))\n"
+            + "AS CSV\n"
+            + "OVERWRITE ALL\n"
+            + "SELECT page, added, delta\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":
 \"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+            + "  )\n"
+            + ")\n",
+            "localStorage", "/shared/export"
+        );
+
+    SqlTaskStatus exportTask = msqHelper.submitMsqTask(exportQuery);
+
+    msqHelper.pollTaskIdForSuccess(exportTask.getTaskId());
+
+    if (exportTask.getState().isFailure()) {
+      Assert.fail(StringUtils.format(
+          "Unable to start the task successfully.\nPossible exception: %s",
+          exportTask.getError()

Review Comment:
   ## Use of default toString()
   
   Default toString(): ErrorResponse inherits toString() from Object, and so is not suitable for printing.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6532)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1475613847


##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java:
##########
@@ -176,4 +188,83 @@ public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
 
     msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
   }
+
+  @Test
+  public void testExport() throws Exception

Review Comment:
   Looking into this.
   
   The new revised integration tests don't seem to enable auth by default,
   https://github.com/apache/druid/blob/6d617c34d29509ca41513ed3f964b1430faf5c2f/integration-tests-ex/cases/cluster/Common/environment-configs/auth.env#L18-L20
   and there don't seem to be any tests for permissions currently. This might take a little time to setup a role which doesn't have extern permissions for a test for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1475840275


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+INSERT statements and REPLACE statements are both supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+REPLACE statements have an additional OVERWRITE clause. As partitioning is not yet supported, only `OVERWRITE ALL`
+is allowed. REPLACE deletes any currently existing files at the specified directory, and creates new files with the results of the query.
+
+
+```sql
+REPLACE INTO
+  EXTERN(<destination function>)
+AS CSV
+OVERWRITE ALL
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage. This can be done passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+
+```sql
+INSERT INTO
+  EXTERN(S3(bucket=<...>, prefix=<...>, tempDir=<...>))

Review Comment:
   Changed the example here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1477980304


##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external destination.
+ */
+public class ExternalDestinationSqlIdentifier extends SqlIdentifier
+{
+  private final Map<String, String> properties;
+
+  public ExternalDestinationSqlIdentifier(
+      String name,
+      SqlParserPos pos,
+      Map<String, String> properties
+  )
+  {
+    super(name, pos);
+    this.properties = properties;
+  }
+
+  public String getDestinationType()

Review Comment:
   Changed this to directly refer to the input source types directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1478013087


##########
processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.local;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConfig;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+@JsonTypeName(LocalFileExportStorageProvider.TYPE_NAME)
+public class LocalFileExportStorageProvider implements ExportStorageProvider
+{
+  public static final String TYPE_NAME = LocalInputSource.TYPE_KEY;
+
+  @JacksonInject
+  StorageConfig storageConfig;
+
+  @JsonProperty
+  private final String exportPath;
+
+  @JsonCreator
+  public LocalFileExportStorageProvider(@JsonProperty(value = "exportPath", required = true) String exportPath)
+  {
+    this.exportPath = exportPath;
+  }
+
+  @Override
+  public StorageConnector get()
+  {
+    final String baseDir = storageConfig.getBaseDir();
+    if (baseDir == null) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("The runtime property `druid.export.storage.baseDir` must be configured.");
+    }
+    try {
+      final File exportDestination = new File(baseDir, exportPath);
+      final String finalOutputPath = exportDestination.getCanonicalPath();
+      if (!finalOutputPath.startsWith(baseDir)) {

Review Comment:
   ## Partial path traversal vulnerability
   
   Partial Path Traversal Vulnerability due to insufficient guard against path traversal.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6539)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1465912735


##########
web-console/src/views/workbench-view/query-tab/query-tab.tsx:
##########
@@ -429,6 +429,7 @@
               </div>
             ) : (
               <div>Unknown query execution state</div>
+                />

Review Comment:
   ## Syntax error
   
   Error: Expression expected.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6504)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1470906712


##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java:
##########
@@ -114,6 +115,12 @@
     );
   }
 
+  @Test
+  public void name()
+  {
+    ImmutableMap<String, String> stringStringImmutableMap = ImmutableMap.of("type", "local", "basePath", "/path");

Review Comment:
   ## Unread local variable
   
   Variable 'ImmutableMap<String,String> stringStringImmutableMap' is never read.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6529)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1477175973


##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java:
##########
@@ -176,4 +188,83 @@
 
     msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
   }
+
+  @Test
+  public void testExport() throws Exception
+  {
+    String exportQuery =
+        StringUtils.format(
+            "INSERT INTO extern(%s(basePath => '%s'))\n"
+            + "AS CSV\n"
+            + "SELECT page, added, delta\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":
 \"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+            + "  )\n"
+            + ")\n",
+            "local", "/shared/export/"
+        );
+
+    SqlTaskStatus exportTask = msqHelper.submitMsqTask(exportQuery);
+
+    msqHelper.pollTaskIdForSuccess(exportTask.getTaskId());
+
+    if (exportTask.getState().isFailure()) {
+      Assert.fail(StringUtils.format(
+          "Unable to start the task successfully.\nPossible exception: %s",
+          exportTask.getError()

Review Comment:
   ## Use of default toString()
   
   Default toString(): ErrorResponse inherits toString() from Object, and so is not suitable for printing.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6538)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1463090420


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageExportConfig.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3.output;
+
+import com.google.inject.Injector;
+import org.apache.druid.catalog.model.table.export.ExportSourceConfig;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.StorageConnectorProvider;
+
+import java.io.File;
+import java.util.Map;
+
+public class S3StorageExportConfig implements ExportSourceConfig
+{
+  @Override
+  public StorageConnectorProvider get(Map<String, String> properties, Injector injector)
+  {
+    return new S3StorageConnectorProvider(
+        properties.get("bucket"),
+        properties.get("prefix"),
+        new File(properties.get("tempDir")),
+        HumanReadableBytes.valueOf(Integer.parseInt(properties.get("chunkSize"))),

Review Comment:
   ## Missing catch of NumberFormatException
   
   Potential uncaught 'java.lang.NumberFormatException'.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6500)



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageExportConfig.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3.output;
+
+import com.google.inject.Injector;
+import org.apache.druid.catalog.model.table.export.ExportSourceConfig;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.StorageConnectorProvider;
+
+import java.io.File;
+import java.util.Map;
+
+public class S3StorageExportConfig implements ExportSourceConfig
+{
+  @Override
+  public StorageConnectorProvider get(Map<String, String> properties, Injector injector)
+  {
+    return new S3StorageConnectorProvider(
+        properties.get("bucket"),
+        properties.get("prefix"),
+        new File(properties.get("tempDir")),
+        HumanReadableBytes.valueOf(Integer.parseInt(properties.get("chunkSize"))),
+        Integer.parseInt(properties.get("maxRetry")),

Review Comment:
   ## Missing catch of NumberFormatException
   
   Potential uncaught 'java.lang.NumberFormatException'.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6499)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471114058


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java:
##########
@@ -106,12 +110,45 @@ protected String operationName()
   @Override
   public void validate()
   {
-    if (ingestNode().getPartitionedBy() == null) {
+    if (ingestNode().getTargetTable() instanceof ExternalDestinationSqlIdentifier) {
+      if (!handlerContext.plannerContext().featureAvailable(EngineFeature.WRITE_EXTERNAL_DATA)) {

Review Comment:
   Is this validation necessary? `IngestHandler` is only supported via the MSQ engine, therefore will this message show up? And if so, let's add a test for the same.



##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")
+  public String getDestinationType()
+  {
+    return destinationType;
+  }
+
+  @JsonProperty("properties")
+  public Map<String, String> getProperties()

Review Comment:
   I am still not convinced that storing a map of properties is a good idea. I think that's the limitation that we are working with when we are translating it from the SQL layer (unless there'd be separate grammar for each thing). 
   IMO it should be deserialized into `StorageConnectorProvider` when it gets stored in the `ExportDestination` at the time of creation, not at the time of application. It's not clear what the properties are supposed to represent unless you dig into the code, hence my concern. We have that context when we create this object.



##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java:
##########
@@ -34,13 +34,18 @@
  */
 public abstract class DruidSqlIngest extends SqlInsert
 {
+  public static final String SQL_EXPORT_FILE_FORMAT = "__exportFileFormat";

Review Comment:
   Is there validation to prevent user from setting up the field? I think we have some restricted context parameters that we can setup, and perhaps we should piggyback on that, or add our own validation (if there isn't). Also, a test to verify that. 



##########
sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java:
##########
@@ -118,5 +118,9 @@ public enum EngineFeature
    * and cannot concat the results together (as * the result for broker is the query id). Therefore, we don't get the
    * correct result back, while the MSQ engine is executing the partial query
    */
-  ALLOW_TOP_LEVEL_UNION_ALL;
+  ALLOW_TOP_LEVEL_UNION_ALL,
+  /**
+   * Queries can write to an external datasource using {@link org.apache.druid.sql.destination.ExportDestination}
+   */
+  WRITE_EXTERNAL_DATA;

Review Comment:
   minor nit: Can be moved below `READ_EXTERNAL_DATA`. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1780,6 +1788,11 @@ private static QueryDefinition makeQueryDefinition(
           MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
       );
       queryToPlan = querySpec.getQuery();
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
+          MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
+      );
+      queryToPlan = querySpec.getQuery();

Review Comment:
   nit: I was wondering if MSQDestination can have a method called `getShuffleSpecFactory(int targetSize)`, and all the three destinations implement it. That will get rid of the ugly instanceof checks here.
   
   In case there are no other pressing changes to be made post this review, I am fine with keeping it the same. 



##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java:
##########
@@ -106,12 +110,45 @@ protected String operationName()
   @Override
   public void validate()
   {
-    if (ingestNode().getPartitionedBy() == null) {
+    if (ingestNode().getTargetTable() instanceof ExternalDestinationSqlIdentifier) {

Review Comment:
   Refactoring comment: This seems slightly messy, that we are interleaving the validation for ingest into datasource along with export. It might be easier to read if the top level validation has following structure:
   ```java
   if (ingestNode.getTargetTable() instanceof ExternalDestinationSqlIdentifier) {
      vallidateExport();
   }
   else if (ingestNode.getTargetTable() instanceof DataSource(?)) {
     validateIngest();
   }
   else {
     throw;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1461765810


##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.utils.CollectionUtils;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external destination.
+ */
+public class ExternalDestinationSqlIdentifier extends SqlIdentifier
+{
+  private final SqlCharStringLiteral exportDestinationString;
+
+  public ExternalDestinationSqlIdentifier(
+      String name,
+      SqlParserPos pos,
+      SqlNode exportDestinationString
+  )
+  {
+    super(name, pos);
+    this.exportDestinationString = (SqlCharStringLiteral) exportDestinationString;
+  }
+
+  public String getExportDestinationString()
+  {
+    return exportDestinationString.toString();
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
+  {
+    SqlWriter.Frame externFrame = writer.startFunCall("EXTERN");
+    writer.print(exportDestinationString.toString());
+    writer.endFunCall(externFrame);
+  }
+
+  @Override
+  public SqlNode clone(SqlParserPos pos)
+  {
+    final String name = CollectionUtils.getOnlyElement(
+        names,
+        x -> DruidException.defensive("Expected single name in identifier [%s], but got [%s]", names)

Review Comment:
   ## Missing format argument
   
   This format call refers to 2 argument(s) but only supplies 1 argument(s).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6493)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481373795


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3.output;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.s3.S3InputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+
+@JsonTypeName(S3ExportStorageProvider.TYPE_NAME)
+public class S3ExportStorageProvider implements ExportStorageProvider
+{
+  public static final String TYPE_NAME = S3InputSource.TYPE_KEY;
+  @JsonProperty
+  private final String bucket;
+  @JsonProperty
+  private final String prefix;
+
+  @JacksonInject
+  S3ExportConfig s3ExportConfig;
+  @JacksonInject
+  ServerSideEncryptingAmazonS3 s3;
+
+  @JsonCreator
+  public S3ExportStorageProvider(
+      @JsonProperty(value = "bucket", required = true) String bucket,
+      @JsonProperty(value = "prefix", required = true) String prefix
+  )
+  {
+    this.bucket = bucket;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public StorageConnector get()
+  {
+    final String tempDir = s3ExportConfig.getTempDir();
+    if (tempDir == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build("The runtime property `druid.export.storage.s3.tempDir` must be configured for S3 export.");
+    }
+    final List<String> allowedExportPaths = s3ExportConfig.getAllowedExportPaths();
+    if (allowedExportPaths == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build(
+                              "The runtime property `druid.export.storage.s3.allowedExportPaths` must be configured for S3 export.");
+    }
+    validateS3Prefix(allowedExportPaths, bucket, prefix);
+    final S3OutputConfig s3OutputConfig = new S3OutputConfig(
+        bucket,
+        prefix,
+        new File(tempDir),
+        s3ExportConfig.getChunkSize(),
+        s3ExportConfig.getMaxRetry()
+    );
+    return new S3StorageConnector(s3OutputConfig, s3);
+  }
+
+  @VisibleForTesting
+  static void validateS3Prefix(@NotNull final List<String> allowedExportPaths, final String bucket, final String prefix)
+  {
+    final URI providedUri = new CloudObjectLocation(bucket, prefix).toUri(S3StorageDruidModule.SCHEME);
+    for (final String path : allowedExportPaths) {
+      final URI allowedUri = URI.create(path);
+      if (validateUri(allowedUri, providedUri)) {
+        return;
+      }
+    }
+    throw DruidException.forPersona(DruidException.Persona.USER)

Review Comment:
   Should the error be better addressed for `Persona.ADMIN` then?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481301885


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,89 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination where you want to export data to.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Keep the following in mind when using EXTERN to export rows:
+- Only INSERT statements are supported.
+- Only `CSV` format is supported as an export format.
+- Partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) aren't supported with export statements.
+- You can export to Amazon S3 or local storage.
+- The destination provided should contain no other files or directories.
+
+When you export data, use the `rowsPerPage` context parameter to control how many rows get exported. The default is 100,000.
+
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+##### S3
+
+Export results to S3 by passing the function `S3()` as an argument to the `EXTERN` function. Note that this requires the `druid-s3-extensions`.
+The `S3()` function is a Druid function that configures the connection. Arguments for `S3()` should be passed as named parameters with the value in single quotes like the following example:
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments for the function:
+
+| Parameter   | Required | Description                                                                                                                                                           | Default |
+|-------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                     | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. | n/a     |

Review Comment:
   We should mention that the bucket and prefix should match whatever the cluster admin has provided. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1877,6 +1876,42 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         return queryDef;
       }
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
+      final ExportStorageProvider exportStorageProvider = exportMSQDestination.getExportStorageProvider();
+
+      try {
+        // Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export.
+        Iterator<String> filesIterator = exportStorageProvider.get().listDir("");
+        if (filesIterator.hasNext()) {
+          throw DruidException.forPersona(DruidException.Persona.USER)
+                              .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                              .build("Found files at provided export destination. Export is only allowed to "

Review Comment:
   Also mention that you can also append a subdir. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1466273549


##########
server/src/main/java/org/apache/druid/catalog/model/table/export/ExportDestination.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.catalog.model.table.export;
+
+import org.apache.druid.catalog.model.table.IngestDestination;
+
+public interface ExportDestination extends IngestDestination

Review Comment:
   We support local disk export, however, it is currently only added in the test module. This would only be used for debugging or for tests.
   
   This class has also been edited to instead contain more methods. Initially, it was added so that ExportDestination could be used directly by MSQ instead of adding an instanceOf check everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471054825


##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")
+  public String getDestinationType()
+  {
+    return destinationType;
+  }
+
+  @JsonProperty("properties")
+  public Map<String, String> getProperties()

Review Comment:
   Changed to deserialize this instead via the objectMapper, which simplified the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471049270


##########
sql/src/main/java/org/apache/druid/sql/destination/IngestDestination.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.annotations.UnstableApi;
+
+/**
+ * Represents the destination to which the ingested data is written to.
+ */
+@UnstableApi
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+public interface IngestDestination
+{
+  String getDestinationName();

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471132471


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.util.SequenceUtils;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ExportResultsFrameProcessor implements FrameProcessor<Object>
+{
+  private final ReadableFrameChannel inputChannel;
+  private final ResultFormat exportFormat;
+  private final FrameReader frameReader;
+  private final StorageConnector storageConnector;
+  private final ObjectMapper jsonMapper;
+  private final int partitionNumber;
+  private final int workerNumber;
+  private final ChannelCounters channelCounter;
+
+  public ExportResultsFrameProcessor(
+      ReadableFrameChannel inputChannel,
+      ResultFormat exportFormat,
+      FrameReader frameReader,
+      StorageConnector storageConnector,
+      ObjectMapper jsonMapper,
+      int partitionNumber,
+      int workerNumber,
+      ChannelCounters channelCounter
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.exportFormat = exportFormat;
+    this.frameReader = frameReader;
+    this.storageConnector = storageConnector;
+    this.jsonMapper = jsonMapper;
+    this.partitionNumber = partitionNumber;
+    this.workerNumber = workerNumber;
+    this.channelCounter = channelCounter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    if (readableInputs.isEmpty()) {
+      return ReturnOrAwait.awaitAll(1);
+    }
+
+    if (inputChannel.isFinished()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      exportFrame(inputChannel.read());
+      return ReturnOrAwait.awaitAll(1);
+    }
+  }
+
+  private void exportFrame(final Frame frame) throws IOException
+  {
+    final RowSignature signature = frameReader.signature();
+
+    final Sequence<Cursor> cursorSequence =
+        new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
+            .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+
+    final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat);
+    try (OutputStream stream = storageConnector.write(exportFilePath)) {
+      ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper);
+
+      SequenceUtils.forEach(
+          cursorSequence,
+          cursor -> {
+            try {
+              formatter.writeResponseStart();

Review Comment:
   Cool! Would that require parser changes? If so, I think adding a header should be default (in this PR). WDYT? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1480041048


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                        | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                                                                                  | n/a     |

Review Comment:
   We donot want to expose bucket here. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.destination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Destination used by tasks that write the results as files to an external destination. {@link #resultFormat} denotes
+ * the format of the file created and {@link #exportStorageProvider} denotes the type of external
+ * destination.
+ * <br>
+ * {@link #replaceTimeChunks} denotes how existing files should be handled.
+ * - If the value is null, the results are appended to the existing files.
+ * - If the value is present, existing files will be deleted according to time intervals.
+ */
+public class ExportMSQDestination implements MSQDestination
+{
+  public static final String TYPE = "export";
+  private final ExportStorageProvider exportStorageProvider;
+  private final ResultFormat resultFormat;
+  @Nullable
+  private final List<Interval> replaceTimeChunks;

Review Comment:
   Why is it called replace timeChunks ?



##########
docs/multi-stage-query/concepts.md:
##########
@@ -115,6 +115,14 @@ When deciding whether to use `REPLACE` or `INSERT`, keep in mind that segments g
 with dimension-based pruning but those generated with `INSERT` cannot. For more information about the requirements
 for dimension-based pruning, see [Clustering](#clustering).
 
+### Write to an external destination with `EXTERN`
+
+Query tasks can write data to an external destination through the `EXTERN` function, when it is used with the `INTO`
+clause, such as `REPLACE INTO EXTERN(...)` The EXTERN function takes arguments which specifies where to the files should be created.

Review Comment:
   ```suggestion
   clause, such as `INSERT INTO EXTERN(...)` The EXTERN function takes arguments which specifies where to the files should be created.
   ```



##########
processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.local;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConfig;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+@JsonTypeName(LocalFileExportStorageProvider.TYPE_NAME)
+public class LocalFileExportStorageProvider implements ExportStorageProvider
+{
+  public static final String TYPE_NAME = LocalInputSource.TYPE_KEY;
+
+  @JacksonInject
+  StorageConfig storageConfig;
+
+  @JsonProperty
+  private final String exportPath;
+
+  @JsonCreator
+  public LocalFileExportStorageProvider(@JsonProperty(value = "exportPath", required = true) String exportPath)
+  {
+    this.exportPath = exportPath;
+  }
+
+  @Override
+  public StorageConnector get()
+  {
+    final File exportDestination = validateAndGetPath(storageConfig.getBaseDir(), exportPath);
+    try {
+      return new LocalFileStorageConnector(exportDestination);
+    }
+    catch (IOException e) {
+      throw new IAE(
+          e,
+          "Unable to create storage connector [%s] for base path [%s]",
+          LocalFileStorageConnector.class.getSimpleName(),
+          exportDestination.toPath()
+      );
+    }
+  }
+
+  @Override
+  @JsonIgnore
+  public String getResourceType()
+  {
+    return TYPE_NAME;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LocalFileExportStorageProvider that = (LocalFileExportStorageProvider) o;
+    return Objects.equals(exportPath, that.exportPath);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(exportPath);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "LocalFileExportStorageProvider{" +
+           "exportPath=" + exportPath +
+           '}';
+  }
+
+  public static File validateAndGetPath(String basePath, String customPath)
+  {
+    if (basePath == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build(
+                              "The runtime property `druid.export.storage.baseDir` must be configured for local export.");

Review Comment:
   Same error message changes here. 



##########
docs/multi-stage-query/concepts.md:
##########
@@ -115,6 +115,14 @@ When deciding whether to use `REPLACE` or `INSERT`, keep in mind that segments g
 with dimension-based pruning but those generated with `INSERT` cannot. For more information about the requirements
 for dimension-based pruning, see [Clustering](#clustering).
 
+### Write to an external destination with `EXTERN`
+
+Query tasks can write data to an external destination through the `EXTERN` function, when it is used with the `INTO`
+clause, such as `REPLACE INTO EXTERN(...)`. The EXTERN function takes arguments that specify where to write the files.

Review Comment:
   ```suggestion
   clause, such as `INSERT INTO EXTERN(...)`. The EXTERN function takes arguments that specify where to write the files.
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java:
##########
@@ -203,7 +209,22 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
 
     final MSQDestination destination;
 
-    if (targetDataSource != null) {
+    if (targetDataSource instanceof ExportDestination) {
+      ExportDestination exportDestination = ((ExportDestination) targetDataSource);
+      ResultFormat format = ResultFormat.fromString(sqlQueryContext.getString(DruidSqlIngest.SQL_EXPORT_FILE_FORMAT));
+
+      if (replaceTimeChunks != null && !Intervals.ONLY_ETERNITY.equals(replaceTimeChunks)) {

Review Comment:
   This should not be needed. Lets remove it. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.destination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Destination used by tasks that write the results as files to an external destination. {@link #resultFormat} denotes
+ * the format of the file created and {@link #exportStorageProvider} denotes the type of external
+ * destination.
+ * <br>
+ * {@link #replaceTimeChunks} denotes how existing files should be handled.
+ * - If the value is null, the results are appended to the existing files.
+ * - If the value is present, existing files will be deleted according to time intervals.
+ */
+public class ExportMSQDestination implements MSQDestination
+{
+  public static final String TYPE = "export";
+  private final ExportStorageProvider exportStorageProvider;
+  private final ResultFormat resultFormat;

Review Comment:
   Where would parquet come  in future ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1772,16 +1777,10 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         queryToPlan = querySpec.getQuery();
       }
-    } else if (querySpec.getDestination() instanceof TaskReportMSQDestination) {
-      shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
-      queryToPlan = querySpec.getQuery();
-    } else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
-      shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
-          MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
-      );
-      queryToPlan = querySpec.getQuery();
     } else {
-      throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
+      shuffleSpecFactory = querySpec.getDestination()
+                                    .getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()));

Review Comment:
   Thanks for the refactor. Its much cleaner now. 
   We should add a comment saying all select partitions are controlled by a context value rowsPerPage.



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,89 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination where you want to export data to.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Keep the following in mind when using EXTERN to export rows:
+- Only INSERT statements are supported.
+- Only `CSV` format is supported as an export format.
+- Partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) aren't supported with export statements.
+- You can export to Amazon S3 or local storage.
+- The destination provided should contain no other files or directories.
+
+When you export data, use the `rowsPerPage` context parameter to control how many rows get exported. The default is 100,000.
+
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+##### S3
+
+Export results to S3 by passing the function `S3()` as an argument to the `EXTERN` function. Note that this requires the `druid-s3-extensions`.
+The `S3()` function is a Druid function that configures the connection. Arguments for `S3()` should be passed as named parameters with the value in single quotes like the following example:
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments for the function:
+
+| Parameter   | Required | Description                                                                                                                                                           | Default |
+|-------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                     | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. | n/a     |

Review Comment:
   Why should mention that the bucket and prefix should match whatever the cluster admin has provided. 



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3.output;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.data.input.s3.S3InputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+
+@JsonTypeName(S3ExportStorageProvider.TYPE_NAME)
+public class S3ExportStorageProvider implements ExportStorageProvider
+{
+  public static final String TYPE_NAME = S3InputSource.TYPE_KEY;
+  @JsonProperty
+  private final String bucket;
+  @JsonProperty
+  private final String prefix;
+
+  @JacksonInject
+  S3ExportConfig s3ExportConfig;
+  @JacksonInject
+  ServerSideEncryptingAmazonS3 s3;
+
+  @JsonCreator
+  public S3ExportStorageProvider(
+      @JsonProperty(value = "bucket", required = true) String bucket,
+      @JsonProperty(value = "prefix", required = true) String prefix
+  )
+  {
+    this.bucket = bucket;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public StorageConnector get()
+  {
+    final String tempDir = s3ExportConfig.getTempDir();
+    if (tempDir == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build("The runtime property `druid.export.storage.s3.tempDir` must be configured for S3 export.");
+    }
+    final List<String> allowedExportPaths = s3ExportConfig.getAllowedExportPaths();
+    if (allowedExportPaths == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build(
+                              "The runtime property `druid.export.storage.s3.allowedExportPaths` must be configured for S3 export.");
+    }
+    validateS3Prefix(allowedExportPaths, bucket, prefix);
+    final S3OutputConfig s3OutputConfig = new S3OutputConfig(
+        bucket,
+        prefix,
+        new File(tempDir),
+        s3ExportConfig.getChunkSize(),
+        s3ExportConfig.getMaxRetry()
+    );
+    return new S3StorageConnector(s3OutputConfig, s3);
+  }
+
+  @VisibleForTesting
+  static void validateS3Prefix(@NotNull final List<String> allowedExportPaths, final String bucket, final String prefix)
+  {
+    final URI providedUri = new CloudObjectLocation(bucket, prefix).toUri(S3StorageDruidModule.SCHEME);
+    for (final String path : allowedExportPaths) {
+      final URI allowedUri = URI.create(path);
+      if (validateUri(allowedUri, providedUri)) {
+        return;
+      }
+    }
+    throw DruidException.forPersona(DruidException.Persona.USER)

Review Comment:
   This error is user facing error. Please mention that the user should reach out to the cluster admin for the paths for export.  The paths are controlled via xxx property 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1872,6 +1871,44 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         return queryDef;
       }
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
+      final StorageConnectorProvider storageConnectorProvider = exportMSQDestination.getStorageConnectorProvider();
+
+      final ResultFormat resultFormat = exportMSQDestination.getResultFormat();
+
+      // If the statement is a 'REPLACE' statement, delete the existing files at the destination.
+      if (exportMSQDestination.getReplaceTimeChunks() != null) {
+        if (Intervals.ONLY_ETERNITY.equals(exportMSQDestination.getReplaceTimeChunks())) {
+          StorageConnector storageConnector = storageConnectorProvider.get();
+          try {
+            storageConnector.deleteRecursively("");

Review Comment:
   Also I think code flow wise, make query definition may not be the correct place to delete the file. 
   Maybe it can be done after we create the query definition object. (Clear files if needed)



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,89 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination where you want to export data to.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Keep the following in mind when using EXTERN to export rows:
+- Only INSERT statements are supported.
+- Only `CSV` format is supported as an export format.
+- Partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) aren't supported with export statements.
+- You can export to Amazon S3 or local storage.
+- The destination provided should contain no other files or directories.
+
+When you export data, use the `rowsPerPage` context parameter to control how many rows get exported. The default is 100,000.
+
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+##### S3
+
+Export results to S3 by passing the function `S3()` as an argument to the `EXTERN` function. Note that this requires the `druid-s3-extensions`.
+The `S3()` function is a Druid function that configures the connection. Arguments for `S3()` should be passed as named parameters with the value in single quotes like the following example:
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments for the function:
+
+| Parameter   | Required | Description                                                                                                                                                           | Default |
+|-------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                     | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. | n/a     |
+
+The following runtime parameters must be configured to export into an S3 destination:
+
+| Runtime Parameter                            | Required | Description                                                                                                                                                                                                                          | Default |
+|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
+| `druid.export.storage.s3.tempSubDir`         | Yes      | Directory used to store temporary files required while uploading the data.                                                                                                                                                           | n/a |

Review Comment:
   These are local directories rite. I think the property name should mention that . s3.tempLocalDir



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1877,6 +1876,42 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         return queryDef;
       }
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
+      final ExportStorageProvider exportStorageProvider = exportMSQDestination.getExportStorageProvider();
+
+      try {
+        // Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export.
+        Iterator<String> filesIterator = exportStorageProvider.get().listDir("");
+        if (filesIterator.hasNext()) {
+          throw DruidException.forPersona(DruidException.Persona.USER)
+                              .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                              .build("Found files at provided export destination. Export is only allowed to "

Review Comment:
   Lets put the path location in the error message. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1877,6 +1876,42 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         return queryDef;
       }
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
+      final ExportStorageProvider exportStorageProvider = exportMSQDestination.getExportStorageProvider();
+
+      try {
+        // Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export.
+        Iterator<String> filesIterator = exportStorageProvider.get().listDir("");
+        if (filesIterator.hasNext()) {
+          throw DruidException.forPersona(DruidException.Persona.USER)
+                              .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                              .build("Found files at provided export destination. Export is only allowed to "

Review Comment:
   Also mention that you can also append a empty subdir. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481342610


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.destination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Destination used by tasks that write the results as files to an external destination. {@link #resultFormat} denotes
+ * the format of the file created and {@link #exportStorageProvider} denotes the type of external
+ * destination.
+ * <br>
+ * {@link #replaceTimeChunks} denotes how existing files should be handled.
+ * - If the value is null, the results are appended to the existing files.
+ * - If the value is present, existing files will be deleted according to time intervals.
+ */
+public class ExportMSQDestination implements MSQDestination
+{
+  public static final String TYPE = "export";
+  private final ExportStorageProvider exportStorageProvider;
+  private final ResultFormat resultFormat;

Review Comment:
   We can change it later. Only thing we have to be take into account is the json key:value the taskQuery Maker sends for CSV should remain unchanged to support backward compatibility during upgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481475320


##########
processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.local;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.data.input.impl.LocalInputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.storage.StorageConfig;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+@JsonTypeName(LocalFileExportStorageProvider.TYPE_NAME)
+public class LocalFileExportStorageProvider implements ExportStorageProvider
+{
+  public static final String TYPE_NAME = LocalInputSource.TYPE_KEY;
+
+  @JacksonInject
+  StorageConfig storageConfig;
+
+  @JsonProperty
+  private final String exportPath;
+
+  @JsonCreator
+  public LocalFileExportStorageProvider(@JsonProperty(value = "exportPath", required = true) String exportPath)
+  {
+    this.exportPath = exportPath;
+  }
+
+  @Override
+  public StorageConnector get()
+  {
+    final File exportDestination = validateAndGetPath(storageConfig.getBaseDir(), exportPath);
+    try {
+      return new LocalFileStorageConnector(exportDestination);
+    }
+    catch (IOException e) {
+      throw new IAE(
+          e,
+          "Unable to create storage connector [%s] for base path [%s]",
+          LocalFileStorageConnector.class.getSimpleName(),
+          exportDestination.toPath()
+      );
+    }
+  }
+
+  @Override
+  @JsonIgnore
+  public String getResourceType()
+  {
+    return TYPE_NAME;
+  }
+
+  @Override
+  @JsonIgnore
+  public String getBasePath()
+  {
+    return exportPath;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LocalFileExportStorageProvider that = (LocalFileExportStorageProvider) o;
+    return Objects.equals(exportPath, that.exportPath);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(exportPath);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "LocalFileExportStorageProvider{" +
+           "exportPath=" + exportPath +
+           '}';
+  }
+
+  public static File validateAndGetPath(String basePath, String customPath)
+  {
+    if (basePath == null) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.NOT_FOUND)
+                          .build(
+                              "The runtime property `druid.export.storage.baseDir` must be configured for local export.");
+    }
+    final File baseDir = new File(basePath);
+    if (!baseDir.isAbsolute()) {
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build(
+                              "The runtime property `druid.export.storage.baseDir` must be an absolute path.");
+    }
+    final File exportFile = new File(customPath);
+    if (!exportFile.toPath().normalize().startsWith(baseDir.toPath())) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("The provided destination must be within the path configured by runtime property `druid.export.storage.baseDir` "
+                                 + "Please reach out to the cluster admin for the allowed path. ", customPath);

Review Comment:
   ## Unused format argument
   
   This format call refers to 0 argument(s) but supplies 1 argument(s).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6547)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1461366758


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java:
##########
@@ -153,15 +154,15 @@ public OverlordClient overlordClient()
 
   @Override
   public QueryMaker buildQueryMakerForInsert(
-      final String targetDataSource,
+      final IngestDestination targetDestination,

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1461651236


##########
sql/src/main/codegen/includes/insert.ftl:
##########
@@ -21,68 +21,68 @@
  * Parses an INSERT statement. This function is copied from SqlInsert in core/src/main/codegen/templates/Parser.jj,
  * with some changes to allow a custom error message if an OVERWRITE clause is present.
  */
-SqlNode DruidSqlInsert() :
+// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
+SqlNode DruidSqlInsertEof() :
 {
-    final List<SqlLiteral> keywords = new ArrayList<SqlLiteral>();
-    final SqlNodeList keywordList;
-    final SqlIdentifier tableName;
-    SqlNode tableRef;
-    SqlNode source;
-    final SqlNodeList columnList;
-    final Span s;
-    final Pair<SqlNodeList, SqlNodeList> p;
+  SqlNode insertNode;
+  final List<SqlLiteral> keywords = new ArrayList<SqlLiteral>();
+  final SqlNodeList keywordList;
+  final SqlIdentifier destination;
+  SqlNode tableRef = null;
+  SqlNode source;
+  final SqlNodeList columnList;
+  final Span s;
+  final Pair<SqlNodeList, SqlNodeList> p;
+  org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
+  SqlNodeList clusteredBy = null;
+  String exportFileFormat = null;
 }
 {
-    (
-        <INSERT>
+  (
+    <INSERT>
     |
-        <UPSERT> { keywords.add(SqlInsertKeyword.UPSERT.symbol(getPos())); }
-    )
-    { s = span(); }
-    SqlInsertKeywords(keywords) {
-        keywordList = new SqlNodeList(keywords, s.addAll(keywords).pos());
-    }
-    <INTO> tableName = CompoundTableIdentifier()
-    ( tableRef = TableHints(tableName) | { tableRef = tableName; } )
+    <UPSERT> { keywords.add(SqlInsertKeyword.UPSERT.symbol(getPos())); }
+  )
+  { s = span(); }
+  SqlInsertKeywords(keywords) {
+    keywordList = new SqlNodeList(keywords, s.addAll(keywords).pos());
+  }
+  <INTO>
+  (
+    LOOKAHEAD(2)
+    <EXTERN> <LPAREN> destination = ExternalDestination() <RPAREN>
+    |
+    destination = CompoundTableIdentifier()
+    ( tableRef = TableHints(destination) | { tableRef = destination; } )
     [ LOOKAHEAD(5) tableRef = ExtendTable(tableRef) ]
-    (
-        LOOKAHEAD(2)
-        p = ParenthesizedCompoundIdentifierList() {
-            if (p.right.size() > 0) {
-                tableRef = extend(tableRef, p.right);
-            }
-            if (p.left.size() > 0) {
-                columnList = p.left;
-            } else {
-                columnList = null;
-            }
-        }
-    |   { columnList = null; }
-    )
-    (
+  )
+  (
+    LOOKAHEAD(2)

Review Comment:
   This part of the code has not been changed, only moved. It is also present in the default SqlInsert statement in calcite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1461684789


##########
sql/src/main/codegen/config.fmpp:
##########
@@ -65,10 +70,16 @@ data: {
       "CLUSTERED"
       "OVERWRITE"
       "PARTITIONED"
+      "EXTERN"
+      "S3"
+      "CSV"

Review Comment:
   These are non-reserved keywords, the way I understand it, the words need to be added to keywords, and then if it should be non reserved, also added to nonReservedKeywords (the way it's done for OVERWRITE).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1470867748


##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")
+  public String getDestinationType()
+  {
+    return destinationType;
+  }
+
+  @JsonProperty("properties")
+  public Map<String, String> getProperties()
+  {
+    return properties;
+  }
+
+  @Override
+  @JsonIgnore
+  public String getDestinationName()
+  {
+    return "EXTERN";

Review Comment:
   Changed



##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471162012


##########
sql/src/main/java/org/apache/druid/sql/calcite/run/EngineFeature.java:
##########
@@ -118,5 +118,9 @@ public enum EngineFeature
    * and cannot concat the results together (as * the result for broker is the query id). Therefore, we don't get the
    * correct result back, while the MSQ engine is executing the partial query
    */
-  ALLOW_TOP_LEVEL_UNION_ALL;
+  ALLOW_TOP_LEVEL_UNION_ALL,
+  /**
+   * Queries can write to an external datasource using {@link org.apache.druid.sql.destination.ExportDestination}
+   */
+  WRITE_EXTERNAL_DATA;

Review Comment:
   Since it's an enum, I didn't want to change the current order. I know this isn't being stored anywhere so it should be fine, but figured I'd stick with that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe merged PR #15689:
URL: https://github.com/apache/druid/pull/15689


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "vogievetsky (via GitHub)" <gi...@apache.org>.
vogievetsky commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1475327849


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+INSERT statements and REPLACE statements are both supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+REPLACE statements have an additional OVERWRITE clause. As partitioning is not yet supported, only `OVERWRITE ALL`
+is allowed. REPLACE deletes any currently existing files at the specified directory, and creates new files with the results of the query.
+
+
+```sql
+REPLACE INTO
+  EXTERN(<destination function>)
+AS CSV
+OVERWRITE ALL
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage. This can be done passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.

Review Comment:
   I would like some more information on what this S3 function with the named is. Is it some special case or is it how we are settling on doing functions with named parameters. It it a SQL thing or a Calcite thing or a Druid thing?
   
   I have at one point seen functions with names parameters be represented as
   
   ```
   FN(x="a")
   FN(x='a')
   FN(x=>'a')
   ```
   
   Where are all these variations coming from? Can there be quotes on the keys? Are they `"` or `'`? Can these functions also accept non-named (ordinal) parameters?



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+INSERT statements and REPLACE statements are both supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+REPLACE statements have an additional OVERWRITE clause. As partitioning is not yet supported, only `OVERWRITE ALL`
+is allowed. REPLACE deletes any currently existing files at the specified directory, and creates new files with the results of the query.
+
+
+```sql
+REPLACE INTO
+  EXTERN(<destination function>)
+AS CSV
+OVERWRITE ALL
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage. This can be done passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+
+```sql
+INSERT INTO
+  EXTERN(S3(bucket=<...>, prefix=<...>, tempDir=<...>))

Review Comment:
   I think this example would be clearer if you used syntax that would actually parse like:
   
   `EXTERN(S3(bucket='s3://your_bucket, prefix='prefix/to/files', tempDir='/var'))`
   
   Otherwise it is very hard to understand what actually needs to go in there. I have read these docs and I still do not understand if the values are quoted with `'` or `"`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471487268


##########
sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java:
##########
@@ -106,12 +110,45 @@ protected String operationName()
   @Override
   public void validate()
   {
-    if (ingestNode().getPartitionedBy() == null) {
+    if (ingestNode().getTargetTable() instanceof ExternalDestinationSqlIdentifier) {

Review Comment:
   Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1452967647


##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.google.common.collect.Iterables;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.catalog.model.table.export.ExportDestination;
+
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external table destination. This contains information
+ * required for a task to write to a destination.
+ */
+public class ExternalDestinationSqlIdentifier extends SqlIdentifier

Review Comment:
   ## No clone method
   
   No clone method, yet implements Cloneable.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6461)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1461688962


##########
sql/src/main/codegen/config.fmpp:
##########
@@ -65,10 +70,16 @@ data: {
       "CLUSTERED"
       "OVERWRITE"
       "PARTITIONED"
+      "EXTERN"
+      "S3"
+      "CSV"

Review Comment:
   Adding a query which checks this is a good idea though. Adding a test which should fail if csv or extern are reserved keywords.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471082148


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.util.SequenceUtils;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ExportResultsFrameProcessor implements FrameProcessor<Object>
+{
+  private final ReadableFrameChannel inputChannel;
+  private final ResultFormat exportFormat;
+  private final FrameReader frameReader;
+  private final StorageConnector storageConnector;
+  private final ObjectMapper jsonMapper;
+  private final int partitionNumber;
+  private final int workerNumber;
+  private final ChannelCounters channelCounter;
+
+  public ExportResultsFrameProcessor(
+      ReadableFrameChannel inputChannel,
+      ResultFormat exportFormat,
+      FrameReader frameReader,
+      StorageConnector storageConnector,
+      ObjectMapper jsonMapper,
+      int partitionNumber,
+      int workerNumber,
+      ChannelCounters channelCounter
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.exportFormat = exportFormat;
+    this.frameReader = frameReader;
+    this.storageConnector = storageConnector;
+    this.jsonMapper = jsonMapper;
+    this.partitionNumber = partitionNumber;
+    this.workerNumber = workerNumber;
+    this.channelCounter = channelCounter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    if (readableInputs.isEmpty()) {
+      return ReturnOrAwait.awaitAll(1);
+    }
+
+    if (inputChannel.isFinished()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      exportFrame(inputChannel.read());
+      return ReturnOrAwait.awaitAll(1);
+    }
+  }
+
+  private void exportFrame(final Frame frame) throws IOException
+  {
+    final RowSignature signature = frameReader.signature();
+
+    final Sequence<Cursor> cursorSequence =
+        new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
+            .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+
+    final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat);
+    try (OutputStream stream = storageConnector.write(exportFilePath)) {
+      ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper);
+
+      SequenceUtils.forEach(
+          cursorSequence,
+          cursor -> {
+            try {
+              formatter.writeResponseStart();

Review Comment:
   Thanks for pointing this out! Moved the response start and end outside. Will look at adding a header with a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1470868204


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.util.SequenceUtils;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ExportResultsFrameProcessor implements FrameProcessor<Object>
+{
+  private final ReadableFrameChannel inputChannel;
+  private final ResultFormat exportFormat;
+  private final FrameReader frameReader;
+  private final StorageConnector storageConnector;
+  private final ObjectMapper jsonMapper;
+  private final int partitionNumber;
+  private final int workerNumber;
+  private final ChannelCounters channelCounter;
+
+  public ExportResultsFrameProcessor(
+      ReadableFrameChannel inputChannel,
+      ResultFormat exportFormat,
+      FrameReader frameReader,
+      StorageConnector storageConnector,
+      ObjectMapper jsonMapper,
+      int partitionNumber,
+      int workerNumber,
+      ChannelCounters channelCounter
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.exportFormat = exportFormat;
+    this.frameReader = frameReader;
+    this.storageConnector = storageConnector;
+    this.jsonMapper = jsonMapper;
+    this.partitionNumber = partitionNumber;
+    this.workerNumber = workerNumber;
+    this.channelCounter = channelCounter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    if (readableInputs.isEmpty()) {
+      return ReturnOrAwait.awaitAll(1);
+    }
+
+    if (inputChannel.isFinished()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      exportFrame(inputChannel.read());
+      return ReturnOrAwait.awaitAll(1);
+    }
+  }
+
+  private void exportFrame(final Frame frame) throws IOException
+  {
+    final RowSignature signature = frameReader.signature();
+
+    final Sequence<Cursor> cursorSequence =
+        new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
+            .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+
+    final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat);

Review Comment:
   Moved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1475101931


##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.StorageConnectorProvider;
+import org.apache.druid.utils.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external destination.
+ */
+public class ExternalDestinationSqlIdentifier extends SqlIdentifier
+{
+  private final Map<String, String> properties;
+
+  public ExternalDestinationSqlIdentifier(
+      String name,
+      SqlParserPos pos,
+      Map<String, String> properties
+  )
+  {
+    super(name, pos);
+    this.properties = properties;
+  }
+
+  public String getDestinationType()

Review Comment:
   Since this is being used for the labels of permissions, please confirm that valid names here line up with the equivalent input source types returned from `InputSource#getTypes`. That way, permission labels will be consistent for reading and writing.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1872,6 +1871,44 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         return queryDef;
       }
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
+      final StorageConnectorProvider storageConnectorProvider = exportMSQDestination.getStorageConnectorProvider();
+
+      final ResultFormat resultFormat = exportMSQDestination.getResultFormat();
+
+      // If the statement is a 'REPLACE' statement, delete the existing files at the destination.
+      if (exportMSQDestination.getReplaceTimeChunks() != null) {
+        if (Intervals.ONLY_ETERNITY.equals(exportMSQDestination.getReplaceTimeChunks())) {
+          StorageConnector storageConnector = storageConnectorProvider.get();
+          try {
+            storageConnector.deleteRecursively("");

Review Comment:
   This worries me because it means that if someone provides a wrong path by accident, we'll delete a bunch of their stuff. It would be better to only delete files that we can confirm came from a previous Druid export. To achieve that we could write our files in a special directory structure, or we could include a manifest file and check for the prior manifest.
   
   The manifest file is probably a good idea anyway, since it makes it easier for readers to be sure they got the entire set of files.
   
   If we use the Hive "symlink manifest" format in particular, then engines like Presto and Spark will be able to more easily read our exports.
   
   Anyway, whatever we do here, I think we should do something different than blindly deleting everything in the path the user provides. It's too likely that it will lead to accidental data deletion.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java:
##########
@@ -176,4 +188,83 @@ public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
 
     msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
   }
+
+  @Test
+  public void testExport() throws Exception

Review Comment:
   Please add an integration test for the access-denied scenario, to ensure that people without the proper EXTERNAL permission cannot do exports.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1475852597


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+INSERT statements and REPLACE statements are both supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+REPLACE statements have an additional OVERWRITE clause. As partitioning is not yet supported, only `OVERWRITE ALL`
+is allowed. REPLACE deletes any currently existing files at the specified directory, and creates new files with the results of the query.
+
+
+```sql
+REPLACE INTO
+  EXTERN(<destination function>)
+AS CSV
+OVERWRITE ALL
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage. This can be done passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.

Review Comment:
   Going through the currently used syntax, it would makes sense to use `FN(x=>'a')` for export as well, so this should . I've also added a line about using single quotes to the docs. The functions would not allow named parameters. I've added this information to the docs as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1480834663


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql

Review Comment:
   Removed this line. Added a new line `- The destination provided should be empty.` which is more accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481342610


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.destination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Destination used by tasks that write the results as files to an external destination. {@link #resultFormat} denotes
+ * the format of the file created and {@link #exportStorageProvider} denotes the type of external
+ * destination.
+ * <br>
+ * {@link #replaceTimeChunks} denotes how existing files should be handled.
+ * - If the value is null, the results are appended to the existing files.
+ * - If the value is present, existing files will be deleted according to time intervals.
+ */
+public class ExportMSQDestination implements MSQDestination
+{
+  public static final String TYPE = "export";
+  private final ExportStorageProvider exportStorageProvider;
+  private final ResultFormat resultFormat;

Review Comment:
   We can change it later. Only thing we have to be take into account is the json key the taskQuery Maker sends should remain unchanged for CSV when we decide to change it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "317brian (via GitHub)" <gi...@apache.org>.
317brian commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1480475160


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.

Review Comment:
   ```suggestion
   This variation of EXTERN has two required parts: an argument that details the destination and an `AS` clause to specify the format of the exported rows.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "317brian (via GitHub)" <gi...@apache.org>.
317brian commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481720709


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.

Review Comment:
   How about the change I just made?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1464797845


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageConnectorFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3.output;
+
+import com.google.inject.Injector;
+import org.apache.druid.storage.export.ExportStorageConnectorFactory;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.StorageConnectorProvider;
+
+import java.io.File;
+import java.util.Map;
+
+public class S3ExportStorageConnectorFactory implements ExportStorageConnectorFactory
+{
+  @Override
+  public StorageConnectorProvider get(Map<String, String> properties, Injector injector)
+  {
+    return new S3StorageConnectorProvider(
+        properties.get("bucket"),
+        properties.get("prefix"),
+        new File(properties.get("tempDir")),
+        HumanReadableBytes.valueOf(Integer.parseInt(properties.get("chunkSize"))),

Review Comment:
   ## Missing catch of NumberFormatException
   
   Potential uncaught 'java.lang.NumberFormatException'.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6502)



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageConnectorFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3.output;
+
+import com.google.inject.Injector;
+import org.apache.druid.storage.export.ExportStorageConnectorFactory;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.storage.StorageConnectorProvider;
+
+import java.io.File;
+import java.util.Map;
+
+public class S3ExportStorageConnectorFactory implements ExportStorageConnectorFactory
+{
+  @Override
+  public StorageConnectorProvider get(Map<String, String> properties, Injector injector)
+  {
+    return new S3StorageConnectorProvider(
+        properties.get("bucket"),
+        properties.get("prefix"),
+        new File(properties.get("tempDir")),
+        HumanReadableBytes.valueOf(Integer.parseInt(properties.get("chunkSize"))),
+        Integer.parseInt(properties.get("maxRetry")),

Review Comment:
   ## Missing catch of NumberFormatException
   
   Potential uncaught 'java.lang.NumberFormatException'.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6501)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1461757028


##########
sql/src/main/java/org/apache/druid/sql/calcite/parser/ExternalDestinationSqlIdentifier.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import com.google.common.collect.Iterables;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.catalog.model.table.export.ExportDestination;
+
+import java.util.Map;
+
+/**
+ * Extends the {@link SqlIdentifier} to hold parameters for an external table destination. This contains information
+ * required for a task to write to a destination.
+ */
+public class ExternalDestinationSqlIdentifier extends SqlIdentifier
+{
+  private final ExportDestination exportDestination;
+  private final Map<String, String> propertiesForUnparse;
+
+  public ExternalDestinationSqlIdentifier(
+      String name,
+      SqlParserPos pos,
+      ExportDestination exportDestination,
+      Map<String, String> propertiesForUnparse
+  )
+  {
+    super(name, pos);
+    this.exportDestination = exportDestination;
+    this.propertiesForUnparse = propertiesForUnparse;
+  }
+
+  public ExportDestination getExportDestination()
+  {
+    return exportDestination;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
+  {
+    SqlWriter.Frame externFrame = writer.startFunCall("EXTERN");
+    SqlWriter.Frame frame = writer.startFunCall(Iterables.getOnlyElement(names));

Review Comment:
   Changed to CollectionUtils.getOnlyElement



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/MSQSelectDestination.java:
##########
@@ -30,6 +30,10 @@ public enum MSQSelectDestination
    * Writes all the results directly to the report.
    */
   TASKREPORT("taskReport", false),
+  /**
+   * Writes the results as rows to a location.

Review Comment:
   Updated the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1469499794


##########
docs/multi-stage-query/reference.md:
##########
@@ -45,8 +45,11 @@ making it easy to reuse the same SQL statement for each ingest: just specify the
 
 ### `EXTERN` Function
 
-Use the `EXTERN` function to read external data. The function has two variations.
+Use the `EXTERN` function to read external data or write to an external source.

Review Comment:
   When we are writing to an external location, it is not a source. Something like the following might be better. 
   ```suggestion
   Use the `EXTERN` function to read external data or write to an external location.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used as a destination, which will export the data to the specified location and format. EXTERN when
+used in this way accepts one argument. Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`)
+is not currently supported with export statements.
+
+INSERT statements and REPLACE statements are both supported with an `EXTERN` destination. The statements require an `AS`
+clause that determines the format.
+Currently, only `CSV` is supported as a format.

Review Comment:
   ```suggestion
   Only `CSV` format is supported at the moment
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used as a destination, which will export the data to the specified location and format. EXTERN when
+used in this way accepts one argument. Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`)
+is not currently supported with export statements.
+
+INSERT statements and REPLACE statements are both supported with an `EXTERN` destination. The statements require an `AS`
+clause that determines the format.
+Currently, only `CSV` is supported as a format.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+REPLACE statements have an additional OVERWRITE clause. As partitioning is not yet supported, only `OVERWRITE ALL`
+is allowed. REPLACE deletes any existing files at the destination and creates new files with the results of the query.

Review Comment:
   "deletes any existing files" -> Does it clean up the directory, or just replace any files and paths conflicting with what the query generates the results for? 



##########
docs/multi-stage-query/concepts.md:
##########
@@ -115,6 +115,17 @@ When deciding whether to use `REPLACE` or `INSERT`, keep in mind that segments g
 with dimension-based pruning but those generated with `INSERT` cannot. For more information about the requirements
 for dimension-based pruning, see [Clustering](#clustering).
 
+### Write to an external destination with `EXTERN`
+
+Query tasks can write data to an external destination through the `EXTERN` function, when it is used with the `INTO`
+clause, such as `REPLACE INTO EXTERN(...)`
+
+The EXTERN function takes arguments which specifies where to the files should be created.
+
+The format can be specified using an `AS` clause.
+

Review Comment:
   nit: Too much line break



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used as a destination, which will export the data to the specified location and format. EXTERN when

Review Comment:
   ```suggestion
   `EXTERN` can be used to specify a destination, where the data needs to be exported.  This variation of EXTERN requires one argument - (What the argument is)
   ```
   minor nit: Also, from the docs, the format is specified via `AS` clause and not extern, therefore I have omitted that from the suggestion



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used as a destination, which will export the data to the specified location and format. EXTERN when
+used in this way accepts one argument. Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`)
+is not currently supported with export statements.
+
+INSERT statements and REPLACE statements are both supported with an `EXTERN` destination. The statements require an `AS`
+clause that determines the format.

Review Comment:
   ```suggestion
   clause that specifies the format.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,66 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used as a destination, which will export the data to the specified location and format. EXTERN when
+used in this way accepts one argument. Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`)
+is not currently supported with export statements.
+
+INSERT statements and REPLACE statements are both supported with an `EXTERN` destination. The statements require an `AS`
+clause that determines the format.
+Currently, only `CSV` is supported as a format.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+REPLACE statements have an additional OVERWRITE clause. As partitioning is not yet supported, only `OVERWRITE ALL`
+is allowed. REPLACE deletes any existing files at the destination and creates new files with the results of the query.
+
+```sql
+REPLACE INTO
+  EXTERN(<destination function>)
+AS CSV
+OVERWRITE ALL
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported to Amazon S3 storage. The S3 extension is required to be loaded for this.
+This can be done passing the function `S3()` as an argument to the `EXTERN` function.

Review Comment:
   ```suggestion
   Exporting is currently supported for Amazon S3 storage. This can be done passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1471132471


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.util.SequenceUtils;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ExportResultsFrameProcessor implements FrameProcessor<Object>
+{
+  private final ReadableFrameChannel inputChannel;
+  private final ResultFormat exportFormat;
+  private final FrameReader frameReader;
+  private final StorageConnector storageConnector;
+  private final ObjectMapper jsonMapper;
+  private final int partitionNumber;
+  private final int workerNumber;
+  private final ChannelCounters channelCounter;
+
+  public ExportResultsFrameProcessor(
+      ReadableFrameChannel inputChannel,
+      ResultFormat exportFormat,
+      FrameReader frameReader,
+      StorageConnector storageConnector,
+      ObjectMapper jsonMapper,
+      int partitionNumber,
+      int workerNumber,
+      ChannelCounters channelCounter
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.exportFormat = exportFormat;
+    this.frameReader = frameReader;
+    this.storageConnector = storageConnector;
+    this.jsonMapper = jsonMapper;
+    this.partitionNumber = partitionNumber;
+    this.workerNumber = workerNumber;
+    this.channelCounter = channelCounter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    if (readableInputs.isEmpty()) {
+      return ReturnOrAwait.awaitAll(1);
+    }
+
+    if (inputChannel.isFinished()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      exportFrame(inputChannel.read());
+      return ReturnOrAwait.awaitAll(1);
+    }
+  }
+
+  private void exportFrame(final Frame frame) throws IOException
+  {
+    final RowSignature signature = frameReader.signature();
+
+    final Sequence<Cursor> cursorSequence =
+        new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
+            .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+
+    final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat);
+    try (OutputStream stream = storageConnector.write(exportFilePath)) {
+      ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper);
+
+      SequenceUtils.forEach(
+          cursorSequence,
+          cursor -> {
+            try {
+              formatter.writeResponseStart();

Review Comment:
   Cool! Would that require parser changes? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1470559070


##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")
+  public String getDestinationType()
+  {
+    return destinationType;
+  }
+
+  @JsonProperty("properties")
+  public Map<String, String> getProperties()

Review Comment:
   Can we have subtypes, like we do for something like a storage connector? We'd have `S3ExportDestination`, and we can expand it further later. I think that looks better than a generic properties map



##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")

Review Comment:
   Can we use something more appropriate than "destinationType"?  "destination" seems cleaner



##########
sql/src/main/java/org/apache/druid/sql/destination/IngestDestination.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.annotations.UnstableApi;
+
+/**
+ * Represents the destination to which the ingested data is written to.

Review Comment:
   ```suggestion
    * Represents the destination where the data is ingested
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.util.SequenceUtils;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ExportResultsFrameProcessor implements FrameProcessor<Object>
+{
+  private final ReadableFrameChannel inputChannel;
+  private final ResultFormat exportFormat;
+  private final FrameReader frameReader;
+  private final StorageConnector storageConnector;
+  private final ObjectMapper jsonMapper;
+  private final int partitionNumber;
+  private final int workerNumber;
+  private final ChannelCounters channelCounter;
+
+  public ExportResultsFrameProcessor(
+      ReadableFrameChannel inputChannel,
+      ResultFormat exportFormat,
+      FrameReader frameReader,
+      StorageConnector storageConnector,
+      ObjectMapper jsonMapper,
+      int partitionNumber,
+      int workerNumber,
+      ChannelCounters channelCounter
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.exportFormat = exportFormat;
+    this.frameReader = frameReader;
+    this.storageConnector = storageConnector;
+    this.jsonMapper = jsonMapper;
+    this.partitionNumber = partitionNumber;
+    this.workerNumber = workerNumber;
+    this.channelCounter = channelCounter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    if (readableInputs.isEmpty()) {
+      return ReturnOrAwait.awaitAll(1);
+    }
+
+    if (inputChannel.isFinished()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      exportFrame(inputChannel.read());
+      return ReturnOrAwait.awaitAll(1);
+    }
+  }
+
+  private void exportFrame(final Frame frame) throws IOException
+  {
+    final RowSignature signature = frameReader.signature();
+
+    final Sequence<Cursor> cursorSequence =
+        new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
+            .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+
+    final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat);
+    try (OutputStream stream = storageConnector.write(exportFilePath)) {
+      ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper);
+
+      SequenceUtils.forEach(
+          cursorSequence,
+          cursor -> {
+            try {
+              formatter.writeResponseStart();

Review Comment:
   Should this be written per cursor level or file level? If the former, then the current code is good, however, if it's the latter then this should be written outside the .forEach().
   The FrameStorageAdapter returns a single cursor, so this doesn't make a difference, however, if that were to change in the future, the code should be correct. 



##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")
+  public String getDestinationType()
+  {
+    return destinationType;
+  }
+
+  @JsonProperty("properties")
+  public Map<String, String> getProperties()
+  {
+    return properties;
+  }
+
+  @Override
+  @JsonIgnore
+  public String getDestinationName()
+  {
+    return "EXTERN";

Review Comment:
   Why is there a difference between this "EXTERN" and the ExportDestination.TYPE_KEY? Let's remove this, add a `getType()` method (which will probably mean that we don't need to have a JsonIgnore on it because it'd get serialized anyways) and use the .getType() instead of `getDestinationName()`.



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.junit.Test;
+
+public class CalciteExportTest extends CalciteIngestionDmlTest
+{
+  @Test
+  public void testReplaceIntoExtern()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO EXTERN(s3(bucket='bucket1',prefix='prefix1',tempDir='/tempdir',chunkSize='5242880',maxRetry='1')) "
+             + "AS CSV "
+             + "OVERWRITE ALL "
+             + "SELECT dim2 FROM foo")
+        .expectQuery(
+            Druids.newScanQueryBuilder()
+                  .dataSource(
+                      "foo"
+                  )
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .columns("dim2")
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .build()
+        )
+        .expectResources(dataSourceRead("foo"))
+        .expectTarget("EXTERN", RowSignature.builder().add("dim2", ColumnType.STRING).build())
+        .verify();
+  }
+
+  @Test
+  public void testExportWithPartitionedBy()
+  {
+    testIngestionQuery()
+        .sql("REPLACE INTO EXTERN(s3(bucket='bucket1',prefix='prefix1',tempDir='/tempdir',chunkSize='5242880',maxRetry='1')) "
+             + "AS CSV "
+             + "OVERWRITE ALL "
+             + "SELECT dim2 FROM foo "
+             + "PARTITIONED BY ALL")
+        .expectValidationError(
+            DruidException.class,
+            "Export statements do not currently support a PARTITIONED BY or CLUSTERED BY clause."

Review Comment:
   ```suggestion
               "Export statements do not support a PARTITIONED BY or CLUSTERED BY clause."
   ```
   A change in the error message



##########
sql/src/main/java/org/apache/druid/sql/destination/IngestDestination.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.annotations.UnstableApi;
+
+/**
+ * Represents the destination to which the ingested data is written to.
+ */
+@UnstableApi
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+public interface IngestDestination
+{
+  String getDestinationName();

Review Comment:
   `String getType()` (as  suggested in the previous comment)



##########
sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.destination;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Destination that represents an ingestion to an external source.
+ */
+@JsonTypeName(ExportDestination.TYPE_KEY)
+public class ExportDestination implements IngestDestination
+{
+  public static final String TYPE_KEY = "external";
+  private final String destinationType;
+  private final Map<String, String> properties;
+
+  public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties)
+  {
+    this.destinationType = destinationType;
+    this.properties = properties;
+  }
+
+  @JsonProperty("destinationType")
+  public String getDestinationType()
+  {
+    return destinationType;
+  }
+
+  @JsonProperty("properties")
+  public Map<String, String> getProperties()
+  {
+    return properties;
+  }
+
+  @Override
+  @JsonIgnore
+  public String getDestinationName()
+  {
+    return "EXTERN";

Review Comment:
   Something like what Query<T> class does



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.util.SequenceUtils;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ExportResultsFrameProcessor implements FrameProcessor<Object>
+{
+  private final ReadableFrameChannel inputChannel;
+  private final ResultFormat exportFormat;
+  private final FrameReader frameReader;
+  private final StorageConnector storageConnector;
+  private final ObjectMapper jsonMapper;
+  private final int partitionNumber;
+  private final int workerNumber;
+  private final ChannelCounters channelCounter;
+
+  public ExportResultsFrameProcessor(
+      ReadableFrameChannel inputChannel,
+      ResultFormat exportFormat,
+      FrameReader frameReader,
+      StorageConnector storageConnector,
+      ObjectMapper jsonMapper,
+      int partitionNumber,
+      int workerNumber,
+      ChannelCounters channelCounter
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.exportFormat = exportFormat;
+    this.frameReader = frameReader;
+    this.storageConnector = storageConnector;
+    this.jsonMapper = jsonMapper;
+    this.partitionNumber = partitionNumber;
+    this.workerNumber = workerNumber;
+    this.channelCounter = channelCounter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    if (readableInputs.isEmpty()) {
+      return ReturnOrAwait.awaitAll(1);
+    }
+
+    if (inputChannel.isFinished()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      exportFrame(inputChannel.read());
+      return ReturnOrAwait.awaitAll(1);
+    }
+  }
+
+  private void exportFrame(final Frame frame) throws IOException
+  {
+    final RowSignature signature = frameReader.signature();
+
+    final Sequence<Cursor> cursorSequence =
+        new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
+            .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+
+    final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat);

Review Comment:
   This can be moved outside into the constructor, and needn't be called each time 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.util.SequenceUtils;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ExportResultsFrameProcessor implements FrameProcessor<Object>
+{
+  private final ReadableFrameChannel inputChannel;
+  private final ResultFormat exportFormat;
+  private final FrameReader frameReader;
+  private final StorageConnector storageConnector;
+  private final ObjectMapper jsonMapper;
+  private final int partitionNumber;
+  private final int workerNumber;
+  private final ChannelCounters channelCounter;
+
+  public ExportResultsFrameProcessor(
+      ReadableFrameChannel inputChannel,
+      ResultFormat exportFormat,
+      FrameReader frameReader,
+      StorageConnector storageConnector,
+      ObjectMapper jsonMapper,
+      int partitionNumber,
+      int workerNumber,
+      ChannelCounters channelCounter
+  )
+  {
+    this.inputChannel = inputChannel;
+    this.exportFormat = exportFormat;
+    this.frameReader = frameReader;
+    this.storageConnector = storageConnector;
+    this.jsonMapper = jsonMapper;
+    this.partitionNumber = partitionNumber;
+    this.workerNumber = workerNumber;
+    this.channelCounter = channelCounter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.singletonList(inputChannel);
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    if (readableInputs.isEmpty()) {
+      return ReturnOrAwait.awaitAll(1);
+    }
+
+    if (inputChannel.isFinished()) {
+      return ReturnOrAwait.returnObject(Unit.instance());
+    } else {
+      exportFrame(inputChannel.read());
+      return ReturnOrAwait.awaitAll(1);
+    }
+  }
+
+  private void exportFrame(final Frame frame) throws IOException
+  {
+    final RowSignature signature = frameReader.signature();
+
+    final Sequence<Cursor> cursorSequence =
+        new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
+            .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+
+    final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat);
+    try (OutputStream stream = storageConnector.write(exportFilePath)) {
+      ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper);
+
+      SequenceUtils.forEach(
+          cursorSequence,
+          cursor -> {
+            try {
+              formatter.writeResponseStart();

Review Comment:
   There's a `writeHeader()` field in the `formatter`'s class. Is that call not required? Perhaps the user would like the CSV to be populated with the row names in the header. Perhaps that can be a user-toggled input as well, something like what CSVParser does (hasHeaderRow). We can have a field like `writeHeaderRow`. This seems useful, particularly for CSV. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "LakshSingla (via GitHub)" <gi...@apache.org>.
LakshSingla commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1470632991


##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.export.TestExportStorageConnector;
+import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MSQExportTest extends MSQTestBase
+{
+  @Test
+  public void testExport() throws IOException

Review Comment:
   One where we are controlling the number of lines per file, using the query context. 



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.export.TestExportStorageConnector;
+import org.apache.druid.msq.test.MSQTestBase;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.http.ResultFormat;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MSQExportTest extends MSQTestBase
+{
+  @Test
+  public void testExport() throws IOException

Review Comment:
   Let's add one test where we are controlling the number of lines per file, using the query context. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "cryptoe (via GitHub)" <gi...@apache.org>.
cryptoe commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1475585658


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1872,6 +1871,44 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         return queryDef;
       }
+    } else if (querySpec.getDestination() instanceof ExportMSQDestination) {
+      final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
+      final StorageConnectorProvider storageConnectorProvider = exportMSQDestination.getStorageConnectorProvider();
+
+      final ResultFormat resultFormat = exportMSQDestination.getResultFormat();
+
+      // If the statement is a 'REPLACE' statement, delete the existing files at the destination.
+      if (exportMSQDestination.getReplaceTimeChunks() != null) {
+        if (Intervals.ONLY_ETERNITY.equals(exportMSQDestination.getReplaceTimeChunks())) {
+          StorageConnector storageConnector = storageConnectorProvider.get();
+          try {
+            storageConnector.deleteRecursively("");

Review Comment:
   Even I think a manifest file is a better alternative. Since it would be a bit more work, what we can do now is to remove the delete call, document it as a known bug and get this in druid 29. 
   Post that, we can add the manifest deletion as part of a follow up PR. 
   How does that plan sound to you @gianm @adarshsanjeev  ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481441700


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.destination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Destination used by tasks that write the results as files to an external destination. {@link #resultFormat} denotes
+ * the format of the file created and {@link #exportStorageProvider} denotes the type of external
+ * destination.
+ * <br>
+ * {@link #replaceTimeChunks} denotes how existing files should be handled.
+ * - If the value is null, the results are appended to the existing files.
+ * - If the value is present, existing files will be deleted according to time intervals.
+ */
+public class ExportMSQDestination implements MSQDestination
+{
+  public static final String TYPE = "export";
+  private final ExportStorageProvider exportStorageProvider;
+  private final ResultFormat resultFormat;
+  @Nullable
+  private final List<Interval> replaceTimeChunks;

Review Comment:
   Removed this. This was initially added to support partitioning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481441354


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.destination;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.msq.querykit.ShuffleSpecFactory;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.storage.ExportStorageProvider;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Destination used by tasks that write the results as files to an external destination. {@link #resultFormat} denotes
+ * the format of the file created and {@link #exportStorageProvider} denotes the type of external
+ * destination.
+ * <br>
+ * {@link #replaceTimeChunks} denotes how existing files should be handled.
+ * - If the value is null, the results are appended to the existing files.
+ * - If the value is present, existing files will be deleted according to time intervals.
+ */
+public class ExportMSQDestination implements MSQDestination
+{
+  public static final String TYPE = "export";
+  private final ExportStorageProvider exportStorageProvider;
+  private final ResultFormat resultFormat;

Review Comment:
   Yes, we should be able to edit Result format here, keeping the json the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1481440460


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1772,16 +1777,10 @@ private static QueryDefinition makeQueryDefinition(
       } else {
         queryToPlan = querySpec.getQuery();
       }
-    } else if (querySpec.getDestination() instanceof TaskReportMSQDestination) {
-      shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
-      queryToPlan = querySpec.getQuery();
-    } else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
-      shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
-          MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
-      );
-      queryToPlan = querySpec.getQuery();
     } else {
-      throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
+      shuffleSpecFactory = querySpec.getDestination()
+                                    .getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()));

Review Comment:
   Do you mean a comment every where the function is being called? We don't pass the whole context to getShuffleSpecFactory(), just the integer, so would this need to be specifically mentioned somewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "317brian (via GitHub)" <gi...@apache.org>.
317brian commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1480461042


##########
docs/multi-stage-query/concepts.md:
##########
@@ -115,6 +115,14 @@ When deciding whether to use `REPLACE` or `INSERT`, keep in mind that segments g
 with dimension-based pruning but those generated with `INSERT` cannot. For more information about the requirements
 for dimension-based pruning, see [Clustering](#clustering).
 
+### Write to an external destination with `EXTERN`
+
+Query tasks can write data to an external destination through the `EXTERN` function, when it is used with the `INTO`
+clause, such as `REPLACE INTO EXTERN(...)` The EXTERN function takes arguments which specifies where to the files should be created.
+The format can be specified using an `AS` clause.

Review Comment:
   ```suggestion
   Query tasks can write data to an external destination through the `EXTERN` function when it is used with the `INTO`
   clause, such as `REPLACE INTO EXTERN(...)`. The EXTERN function takes arguments that specify where to write the files.
   The format can be specified using an `AS` clause.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.

Review Comment:
   ```suggestion
   This variation of EXTERN requires two arguments: the details of the destination and an `AS` clause to specify the format of the exported rows.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql

Review Comment:
   ```suggestion
   INSERT statements append the results to the existing files at the destination.
   
   ```sql
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                        | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                                                                                  | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query would expect the destination to be empty. If the location includes other files, then the query will fail.                                                         | n/a     |
+
+The following runtime parameters must be configured to export into an S3 destination:
+
+| Runtime Parameter                            | Required | Description                                                                                                                                                                                                                        | Default |
+|----------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
+| `druid.export.storage.s3.tempSubDir`         | Yes      | Directory used to store temporary files required while uploading the data.                                                                                                                                                         | n/a |
+| `druid.export.storage.s3.allowedExportPaths` | Yes      | An array of S3 prefixes which are whitelisted as export destinations. Export query fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]`     | n/a |

Review Comment:
   ```suggestion
   | `druid.export.storage.s3.allowedExportPaths` | Yes      | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]`     | n/a |
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                        | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                                                                                  | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query would expect the destination to be empty. If the location includes other files, then the query will fail.                                                         | n/a     |
+
+The following runtime parameters must be configured to export into an S3 destination:
+
+| Runtime Parameter                            | Required | Description                                                                                                                                                                                                                        | Default |
+|----------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
+| `druid.export.storage.s3.tempSubDir`         | Yes      | Directory used to store temporary files required while uploading the data.                                                                                                                                                         | n/a |
+| `druid.export.storage.s3.allowedExportPaths` | Yes      | An array of S3 prefixes which are whitelisted as export destinations. Export query fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]`     | n/a |
+| `druid.export.storage.s3.maxRetry`           | No       | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors.                                                                                                                                    | 10  |
+| `druid.export.storage.s3.chunkSize`          | No       | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |
+
+##### LOCAL
+
+Exporting is also supported to the local storage, which exports the results to the filesystem of the MSQ worker.
+This is useful in a single node setup or for testing, and is not suitable for production use cases.
+
+This can be done by passing the function `LOCAL()` as an argument to the `EXTERN FUNCTION`.
+Arguments to `LOCAL()` should be passed as named parameters with the value in single quotes like the example below.
+
+To use local as an export destination, the runtime property `druid.export.storage.baseDir` must be configured on the indexer/middle manager.
+The parameter provided to the `LOCAL()` function will be prefixed with this value when exporting to a local destination.

Review Comment:
   ```suggestion
   
   Arguments to `LOCAL()` should be passed as named parameters with the value in single quotes in the following example:
   
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.

Review Comment:
   ```suggestion
   Keep the following in mind when using EXTERN to export rows:
   
   - Only INSERT statements are supported.
   - Only `CSV` format is supported as an export format.
   - Partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) aren't supported with export statements.
   - You can export to Amazon S3 or local storage.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.

Review Comment:
   ```suggestion
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value

Review Comment:
   When you export data, use the `rowsPerPage` context parameter to control how many rows get exported. The default is 100,000.



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                        | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                                                                                  | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query would expect the destination to be empty. If the location includes other files, then the query will fail.                                                         | n/a     |
+
+The following runtime parameters must be configured to export into an S3 destination:
+
+| Runtime Parameter                            | Required | Description                                                                                                                                                                                                                        | Default |
+|----------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
+| `druid.export.storage.s3.tempSubDir`         | Yes      | Directory used to store temporary files required while uploading the data.                                                                                                                                                         | n/a |
+| `druid.export.storage.s3.allowedExportPaths` | Yes      | An array of S3 prefixes which are whitelisted as export destinations. Export query fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]`     | n/a |
+| `druid.export.storage.s3.maxRetry`           | No       | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors.                                                                                                                                    | 10  |
+| `druid.export.storage.s3.chunkSize`          | No       | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |
+
+##### LOCAL
+
+Exporting is also supported to the local storage, which exports the results to the filesystem of the MSQ worker.

Review Comment:
   ```suggestion
   You can export to the local storage, which exports the results to the filesystem of the MSQ worker.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.

Review Comment:
   ```suggestion
   `EXTERN` can be used to specify a destination where you want to export data to.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.

Review Comment:
   ```suggestion
   Export results to S3 by passing the function `S3()` as an argument to the `EXTERN` function. Note that this requires the `druid-s3-extensions`.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:

Review Comment:
   ```suggestion
   Supported arguments for the function:
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.

Review Comment:
   ```suggestion
   The `S3()` function is a Druid function that configures the connection. Arguments for `S3()` should be passed as named parameters with the value in single quotes like the following example: 
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                        | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                                                                                  | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query would expect the destination to be empty. If the location includes other files, then the query will fail.                                                         | n/a     |
+
+The following runtime parameters must be configured to export into an S3 destination:
+
+| Runtime Parameter                            | Required | Description                                                                                                                                                                                                                        | Default |
+|----------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
+| `druid.export.storage.s3.tempSubDir`         | Yes      | Directory used to store temporary files required while uploading the data.                                                                                                                                                         | n/a |
+| `druid.export.storage.s3.allowedExportPaths` | Yes      | An array of S3 prefixes which are whitelisted as export destinations. Export query fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]`     | n/a |
+| `druid.export.storage.s3.maxRetry`           | No       | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors.                                                                                                                                    | 10  |
+| `druid.export.storage.s3.chunkSize`          | No       | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |
+
+##### LOCAL
+
+Exporting is also supported to the local storage, which exports the results to the filesystem of the MSQ worker.
+This is useful in a single node setup or for testing, and is not suitable for production use cases.
+
+This can be done by passing the function `LOCAL()` as an argument to the `EXTERN FUNCTION`.
+Arguments to `LOCAL()` should be passed as named parameters with the value in single quotes like the example below.
+
+To use local as an export destination, the runtime property `druid.export.storage.baseDir` must be configured on the indexer/middle manager.
+The parameter provided to the `LOCAL()` function will be prefixed with this value when exporting to a local destination.
+
+```sql
+INSERT INTO
+  EXTERN(
+    local(exportPath => 'exportLocation/query1')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                              | Default |
+|-------------|--------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| --|
+| `exportPath`  | Yes | Subdirectory of `druid.export.storage.baseDir` used to as the destination to export the results to. The export query expects the destination to be empty. If the location includes other files or directories, then the query will fail. | n/a |

Review Comment:
   ```suggestion
   | `exportPath`  | Yes | Subdirectory of `druid.export.storage.baseDir` used as the destination to export the results to. The export query expects the destination to be empty. If the location includes other files or directories, then the query will fail. | n/a |
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                        | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                                                                                  | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query would expect the destination to be empty. If the location includes other files, then the query will fail.                                                         | n/a     |

Review Comment:
   ```suggestion
   | `prefix`    | Yes      | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, the query will fail.                                                         | n/a     |
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                        | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                                                                                  | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query would expect the destination to be empty. If the location includes other files, then the query will fail.                                                         | n/a     |
+
+The following runtime parameters must be configured to export into an S3 destination:
+
+| Runtime Parameter                            | Required | Description                                                                                                                                                                                                                        | Default |
+|----------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
+| `druid.export.storage.s3.tempSubDir`         | Yes      | Directory used to store temporary files required while uploading the data.                                                                                                                                                         | n/a |
+| `druid.export.storage.s3.allowedExportPaths` | Yes      | An array of S3 prefixes which are whitelisted as export destinations. Export query fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]`     | n/a |
+| `druid.export.storage.s3.maxRetry`           | No       | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors.                                                                                                                                    | 10  |
+| `druid.export.storage.s3.chunkSize`          | No       | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |
+
+##### LOCAL
+
+Exporting is also supported to the local storage, which exports the results to the filesystem of the MSQ worker.
+This is useful in a single node setup or for testing, and is not suitable for production use cases.

Review Comment:
   ```suggestion
   This is useful in a single node setup or for testing but is not suitable for production use cases.
   ```



##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value
+is 100,000.
+
+INSERT statements append the results to the existing files at the destination.
+```sql
+INSERT INTO
+  EXTERN(<destination function>)
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Exporting is currently supported for Amazon S3 storage and local storage.
+
+##### S3
+
+Exporting results to S3 can be done by passing the function `S3()` as an argument to the `EXTERN` function. The `druid-s3-extensions` should be loaded.
+The `S3()` function is a druid function which configures the connection. Arguments to `S3()` should be passed as named parameters with the value in single quotes like the example below.
+
+```sql
+INSERT INTO
+  EXTERN(
+    S3(bucket => 's3://your_bucket', prefix => 'prefix/to/files')
+  )
+AS CSV
+SELECT
+  <column>
+FROM <table>
+```
+
+Supported arguments to the function:
+
+| Parameter   | Required | Description                                                                                                                                                                                                                        | Default |
+|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| `bucket`    | Yes      | The S3 bucket to which the files are exported to.                                                                                                                                                                                  | n/a     |
+| `prefix`    | Yes      | Path where the exported files would be created. The export query would expect the destination to be empty. If the location includes other files, then the query will fail.                                                         | n/a     |
+
+The following runtime parameters must be configured to export into an S3 destination:
+
+| Runtime Parameter                            | Required | Description                                                                                                                                                                                                                        | Default |
+|----------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
+| `druid.export.storage.s3.tempSubDir`         | Yes      | Directory used to store temporary files required while uploading the data.                                                                                                                                                         | n/a |
+| `druid.export.storage.s3.allowedExportPaths` | Yes      | An array of S3 prefixes which are whitelisted as export destinations. Export query fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]`     | n/a |
+| `druid.export.storage.s3.maxRetry`           | No       | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors.                                                                                                                                    | 10  |
+| `druid.export.storage.s3.chunkSize`          | No       | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |
+
+##### LOCAL
+
+Exporting is also supported to the local storage, which exports the results to the filesystem of the MSQ worker.
+This is useful in a single node setup or for testing, and is not suitable for production use cases.
+
+This can be done by passing the function `LOCAL()` as an argument to the `EXTERN FUNCTION`.

Review Comment:
   ```suggestion
   Export results to local storage by passing the function `LOCAL()` as an argument for the `EXTERN FUNCTION`. To use local storage as an export destination, the runtime property `druid.export.storage.baseDir` must be configured on the Indexer/Middle Manager.
   The parameter provided to the `LOCAL()` function will be prefixed with this value when exporting to a local destination.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "317brian (via GitHub)" <gi...@apache.org>.
317brian commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1480480808


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.
+
+Only INSERT statements are supported with an `EXTERN` destination.
+Only `CSV` format is supported at the moment.
+Please note that partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) is not currently supported with export statements.
+
+Export statements support the context parameter `rowsPerPage` for the number of rows in each exported file. The default value

Review Comment:
   When you export data, use the `rowsPerPage` context parameter to control how many rows get exported. The default is 100,000.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Add export capabilities to MSQ with SQL syntax (druid)

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #15689:
URL: https://github.com/apache/druid/pull/15689#discussion_r1480833441


##########
docs/multi-stage-query/reference.md:
##########
@@ -90,6 +93,93 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.
 
 For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).
 
+#### `EXTERN` to export to a destination
+
+`EXTERN` can be used to specify a destination, where the data needs to be exported.
+This variation of EXTERN requires one argument, the details of the destination as specified below.
+This variation additionally requires an `AS` clause to specify the format of the exported rows.

Review Comment:
   The `AS` clause would not be an argument to extern, it's present elsewhere in the query



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org