You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/09/01 05:01:41 UTC

[GitHub] [incubator-doris] morningman commented on a change in pull request #6539: Support concurrent export of query results

morningman commented on a change in pull request #6539:
URL: https://github.com/apache/incubator-doris/pull/6539#discussion_r699833595



##########
File path: fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
##########
@@ -45,6 +48,8 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.doris.backup.S3Storage.S3_PROPERTIES_PREFIX;

Review comment:
       remove static import

##########
File path: be/src/exec/data_sink.cpp
##########
@@ -78,14 +80,29 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         }
         sink->reset(tmp_sink);
         break;
-    case TDataSinkType::MEMORY_SCRATCH_SINK:
+    }
+    case TDataSinkType::RESULT_FILE_SINK: {
+        if (!thrift_sink.__isset.result_file_sink) {
+            return Status::InternalError("Missing result file sink.");
+        }
+        if (params.__isset.destinations && params.destinations.size() > 0) {

Review comment:
       Add comments

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
##########
@@ -298,9 +285,76 @@ private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root)
         return selectNode;
     }
 
+    private void pushDownResultFileSink(Analyzer analyzer) {

Review comment:
       Add some comments?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
##########
@@ -78,6 +78,13 @@ public ExchangeNode(PlanNodeId id, PlanNode inputNode, boolean copyConjuncts) {
         computeTupleIds();
     }
 
+    public boolean isMergingExchange() {
+        if (planNodeName.equals(MERGING_EXCHANGE_NODE)) {

Review comment:
       Use `mergeInfo` to check it better?

##########
File path: be/src/runtime/file_result_writer.h
##########
@@ -31,6 +32,7 @@ class RuntimeProfile;
 class TupleRow;
 
 struct ResultFileOptions {
+    // deprecated
     bool is_local_file;

Review comment:
       Why not delete it?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
##########
@@ -771,12 +774,17 @@ public boolean isExtractWideRangeExpr() {
         return extractWideRangeExpr;
     }
 
+<<<<<<< HEAD

Review comment:
       ??

##########
File path: be/src/runtime/file_result_writer.cpp
##########
@@ -392,7 +454,8 @@ Status FileResultWriter::close() {
     // so does the profile in RuntimeState.
     COUNTER_SET(_written_rows_counter, _written_rows);
     SCOPED_TIMER(_writer_close_timer);
-    return _close_file_writer(true);
+    RETURN_IF_ERROR(_close_file_writer(true, false));

Review comment:
       just return

##########
File path: fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
##########
@@ -298,9 +285,76 @@ private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root)
         return selectNode;
     }
 
+    private void pushDownResultFileSink(Analyzer analyzer) {
+        if (fragments.size() < 1) {
+            return;
+        }
+        if (!(fragments.get(0).getSink() instanceof ResultFileSink)) {
+            return;
+        }
+        if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) {
+            return;
+        }
+        if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) {
+            return;
+        }
+        PlanFragment topPlanFragment = fragments.get(0);
+        ExchangeNode topPlanNode = (ExchangeNode) topPlanFragment.getPlanRoot();
+        // try to push down result file sink
+        if (topPlanNode.isMergingExchange()) {
+            return;
+        }
+        PlanFragment secondPlanFragment = fragments.get(1);
+        ResultFileSink resultFileSink = (ResultFileSink) topPlanFragment.getSink();
+        if (resultFileSink.getStorageType() == StorageBackend.StorageType.BROKER) {
+            return;
+        }
+        if (secondPlanFragment.getOutputExprs() != null) {
+            return;
+        }
+        // create result file sink desc
+        TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer);
+        resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink());
+        resultFileSink.setOutputTupleId(fileStatusDesc.getId());
+        secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs());
+        secondPlanFragment.resetSink(resultFileSink);
+        ResultSink resultSink = new ResultSink(topPlanNode.getId());
+        topPlanFragment.resetSink(resultSink);
+        topPlanFragment.resetOutputExprs(fileStatusDesc);
+        topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId()));
+    }
+
+    private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) {

Review comment:
       Add some comments

##########
File path: be/src/runtime/file_result_writer.cpp
##########
@@ -39,13 +42,36 @@ namespace doris {
 
 const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
 
+// deprecated
 FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
                                    const std::vector<ExprContext*>& output_expr_ctxs,
                                    RuntimeProfile* parent_profile, BufferControlBlock* sinker)
         : _file_opts(file_opts),
           _output_expr_ctxs(output_expr_ctxs),
           _parent_profile(parent_profile),
-          _sinker(sinker) {}
+          _sinker(sinker) {
+        if (_file_opts->is_local_file) {
+            _storage_type = TStorageBackendType::LOCAL;
+        } else {
+            _storage_type = TStorageBackendType::BROKER;
+        }
+        _fragment_instance_id.hi = 12345678987654321;

Review comment:
       What's this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org