You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2021/02/18 16:58:14 UTC

[arrow] branch patching created (now 68976f1)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a change to branch patching
in repository https://gitbox.apache.org/repos/asf/arrow.git.


      at 68976f1  WIP: track name of scan task

This branch includes the following new commits:

     new ec2ca8c  WIP: add basic span class
     new 68976f1  WIP: track name of scan task

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow] 01/02: WIP: add basic span class

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch patching
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit ec2ca8c845cf155c2fe2df61e7b9df84b88ffde0
Author: David Li <li...@gmail.com>
AuthorDate: Wed Feb 17 15:56:55 2021 -0500

    WIP: add basic span class
---
 cpp/src/arrow/CMakeLists.txt   |  1 +
 cpp/src/arrow/flight/server.cc | 19 ++++++++----
 cpp/src/arrow/util/span.cc     | 65 ++++++++++++++++++++++++++++++++++++++++++
 cpp/src/arrow/util/span.h      | 22 ++++++++++++++
 4 files changed, 101 insertions(+), 6 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 4403def..6211d2b 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -193,6 +193,7 @@ set(ARROW_SRCS
     util/key_value_metadata.cc
     util/memory.cc
     util/mutex.cc
+    util/span.cc
     util/string.cc
     util/string_builder.cc
     util/task_group.cc
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 4e35950..70967dc 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -44,6 +44,7 @@
 #include "arrow/status.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/span.h"
 #include "arrow/util/uri.h"
 
 #include "arrow/flight/internal.h"
@@ -658,12 +659,18 @@ class FlightServiceImpl : public FlightService::Service {
     // Consume data stream and write out payloads
     while (true) {
       FlightPayload payload;
-      SERVICE_RETURN_NOT_OK(flight_context, data_stream->Next(&payload));
-      if (payload.ipc_message.metadata == nullptr ||
-          !internal::WritePayload(payload, writer))
-        // No more messages to write, or connection terminated for some other
-        // reason
-        break;
+      {
+        Span span("DoGet::FlightDataStream::Next");
+        SERVICE_RETURN_NOT_OK(flight_context, data_stream->Next(&payload));
+      }
+      {
+        Span span("DoGet::WritePayload");
+        if (payload.ipc_message.metadata == nullptr ||
+            !internal::WritePayload(payload, writer))
+          // No more messages to write, or connection terminated for some other
+          // reason
+          break;
+      }
     }
     RETURN_WITH_MIDDLEWARE(flight_context, grpc::Status::OK);
   }
diff --git a/cpp/src/arrow/util/span.cc b/cpp/src/arrow/util/span.cc
new file mode 100644
index 0000000..c6b0c4b
--- /dev/null
+++ b/cpp/src/arrow/util/span.cc
@@ -0,0 +1,65 @@
+#include "arrow/util/span.h"
+
+#include <chrono>
+#include <cstdint>
+#include <sstream>
+#include <thread>
+#include <unordered_map>
+
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+int64_t GetNowNs() {
+  std::chrono::time_point<std::chrono::steady_clock, std::chrono::nanoseconds>
+      now(std::chrono::steady_clock::now());
+  return now.time_since_epoch().count();
+}
+
+class Span::Impl {
+ public:
+  explicit Impl(const std::string& name)
+      : name_(name),
+        thread_id_(std::this_thread::get_id()),
+        start_ns_(GetNowNs()) {}
+
+  void AddAttribute(const std::string& key, const std::string& value) {
+    attributes_.insert({key, value});
+  }
+
+  void End() {
+    const auto end_ns = GetNowNs();
+    ARROW_LOG(DEBUG) << "span"
+                     << " name=" << name_ << ";thread_id=" << thread_id_
+                     << ";start=" << start_ns_ << ";end=" << end_ns
+                     << AttributesKeyValueString();
+  }
+
+  std::string AttributesKeyValueString() const {
+    if (attributes_.empty()) {
+      return "";
+    }
+    std::stringstream result;
+    for (const auto& pair : attributes_) {
+      result << ';' << pair.first << '=' << pair.second;
+    }
+    return result.str();
+  }
+
+ private:
+  std::string name_;
+  std::thread::id thread_id_;
+  int64_t start_ns_;
+  std::unordered_map<std::string, std::string> attributes_;
+};
+
+Span::Span(const std::string& name) : impl_(new Impl(name)) {}
+Span::~Span() { End(); }
+
+void Span::AddAttribute(const std::string& key, const std::string& value) {
+  impl_->AddAttribute(key, value);
+}
+
+void Span::End() { impl_->End(); }
+
+}
diff --git a/cpp/src/arrow/util/span.h b/cpp/src/arrow/util/span.h
new file mode 100644
index 0000000..1dff353
--- /dev/null
+++ b/cpp/src/arrow/util/span.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <memory>
+#include <string>
+
+namespace arrow {
+
+class Span final {
+ public:
+  explicit Span(const std::string& name);
+  ~Span();
+
+  void AddAttribute(const std::string& key, const std::string& value);
+  void End();
+
+ private:
+  class Impl;
+
+  std::shared_ptr<Impl> impl_;
+};
+
+} // namespace arrow


[arrow] 02/02: WIP: track name of scan task

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch patching
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 68976f1a93e7997d2be20cecb1fad4552e147803
Author: David Li <li...@gmail.com>
AuthorDate: Wed Feb 17 16:13:23 2021 -0500

    WIP: track name of scan task
---
 cpp/src/arrow/dataset/file_parquet.cc    | 11 +++++++++--
 cpp/src/arrow/dataset/scanner.h          |  4 ++++
 cpp/src/arrow/dataset/scanner_internal.h |  4 ++++
 3 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 6c9f0ec..19c21a3 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -54,11 +54,12 @@ using parquet::arrow::StatisticsAsScalars;
 /// \brief A ScanTask backed by a parquet file and a RowGroup within a parquet file.
 class ParquetScanTask : public ScanTask {
  public:
-  ParquetScanTask(int row_group, std::vector<int> column_projection,
+  ParquetScanTask(std::string name, int row_group, std::vector<int> column_projection,
                   std::shared_ptr<parquet::arrow::FileReader> reader,
                   std::function<void()> pre_buffer, std::shared_ptr<ScanOptions> options,
                   std::shared_ptr<ScanContext> context)
       : ScanTask(std::move(options), std::move(context)),
+        name_(std::move(name)),
         row_group_(row_group),
         column_projection_(std::move(column_projection)),
         reader_(std::move(reader)),
@@ -89,7 +90,12 @@ class ParquetScanTask : public ScanTask {
     return MakeFunctionIterator(std::move(NextBatch));
   }
 
+  std::string name() override {
+    return name_;
+  }
+
  private:
+  std::string name_;
   int row_group_;
   std::vector<int> column_projection_;
   std::shared_ptr<parquet::arrow::FileReader> reader_;
@@ -361,7 +367,8 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions
   std::function<void()> pre_buffer = PreBufferTask;
 
   for (size_t i = 0; i < row_groups.size(); ++i) {
-    tasks[i] = std::make_shared<ParquetScanTask>(row_groups[i], column_projection, reader,
+    tasks[i] = std::make_shared<ParquetScanTask>(fragment->source().path(),
+                                                 row_groups[i], column_projection, reader,
                                                  pre_buffer, options, context);
   }
 
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index e65ac7f..93fc43f 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -104,6 +104,8 @@ class ARROW_DS_EXPORT ScanTask {
   /// particular ScanTask implementation
   virtual Result<RecordBatchIterator> Execute() = 0;
 
+  virtual std::string name() { return "(unknown)"; };
+
   virtual ~ScanTask() = default;
 
   const std::shared_ptr<ScanOptions>& options() const { return options_; }
@@ -128,6 +130,8 @@ class ARROW_DS_EXPORT InMemoryScanTask : public ScanTask {
 
   Result<RecordBatchIterator> Execute() override;
 
+  std::string name() override { return "(memory)"; };
+
  protected:
   std::vector<std::shared_ptr<RecordBatch>> record_batches_;
 };
diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h
index 729bd1f..1d15ea8 100644
--- a/cpp/src/arrow/dataset/scanner_internal.h
+++ b/cpp/src/arrow/dataset/scanner_internal.h
@@ -92,6 +92,10 @@ class FilterAndProjectScanTask : public ScanTask {
     return ProjectRecordBatch(std::move(filter_it), &projector_, context_->pool);
   }
 
+  std::string name() override {
+    return task_->name();
+  }
+
  private:
   std::shared_ptr<ScanTask> task_;
   Expression partition_;