You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2021/02/22 14:50:58 UTC

[arrow] branch patching created (now ab15813)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a change to branch patching
in repository https://gitbox.apache.org/repos/asf/arrow.git.


      at ab15813  Add overload for future

This branch includes the following new commits:

     new b17e41f  Spans, scan task name
     new ab15813  Add overload for future

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow] 01/02: Spans, scan task name

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch patching
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit b17e41fe682b5df5808885bfe253412fc1c777a9
Author: David Li <li...@gmail.com>
AuthorDate: Wed Feb 17 15:56:55 2021 -0500

    Spans, scan task name
---
 cpp/src/arrow/CMakeLists.txt             |  1 +
 cpp/src/arrow/dataset/file_parquet.cc    | 11 ++++--
 cpp/src/arrow/dataset/scanner.h          |  4 ++
 cpp/src/arrow/dataset/scanner_internal.h |  2 +
 cpp/src/arrow/flight/server.cc           | 19 +++++++---
 cpp/src/arrow/util/span.cc               | 63 ++++++++++++++++++++++++++++++++
 cpp/src/arrow/util/span.h                | 22 +++++++++++
 7 files changed, 113 insertions(+), 9 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 4403def..6211d2b 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -193,6 +193,7 @@ set(ARROW_SRCS
     util/key_value_metadata.cc
     util/memory.cc
     util/mutex.cc
+    util/span.cc
     util/string.cc
     util/string_builder.cc
     util/task_group.cc
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index c26ad04..7c9085f 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -53,7 +53,7 @@ using parquet::arrow::StatisticsAsScalars;
 /// \brief A ScanTask backed by a parquet file and a RowGroup within a parquet file.
 class ParquetScanTask : public ScanTask {
  public:
-  ParquetScanTask(int row_group, std::vector<int> column_projection,
+  ParquetScanTask(std::string name, int row_group, std::vector<int> column_projection,
                   std::shared_ptr<parquet::arrow::FileReader> reader,
                   std::shared_ptr<std::once_flag> pre_buffer_once,
                   std::vector<int> pre_buffer_row_groups,
@@ -62,6 +62,7 @@ class ParquetScanTask : public ScanTask {
                   std::shared_ptr<ScanOptions> options,
                   std::shared_ptr<ScanContext> context)
       : ScanTask(std::move(options), std::move(context)),
+        name_(std::move(name)),
         row_group_(row_group),
         column_projection_(std::move(column_projection)),
         reader_(std::move(reader)),
@@ -113,7 +114,10 @@ class ParquetScanTask : public ScanTask {
     return Status::OK();
   }
 
+  std::string name() override { return name_; }
+
  private:
+  std::string name_;
   int row_group_;
   std::vector<int> column_projection_;
   std::shared_ptr<parquet::arrow::FileReader> reader_;
@@ -361,8 +365,9 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions
 
   for (size_t i = 0; i < row_groups.size(); ++i) {
     tasks[i] = std::make_shared<ParquetScanTask>(
-        row_groups[i], column_projection, reader, pre_buffer_once, row_groups,
-        reader_options.async_context, reader_options.cache_options, options, context);
+        fragment->source().path(), row_groups[i], column_projection, reader,
+        pre_buffer_once, row_groups, reader_options.async_context,
+        reader_options.cache_options, options, context);
   }
 
   return MakeVectorIterator(std::move(tasks));
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index e65ac7f..5a9870c 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -104,6 +104,8 @@ class ARROW_DS_EXPORT ScanTask {
   /// particular ScanTask implementation
   virtual Result<RecordBatchIterator> Execute() = 0;
 
+  virtual std::string name() { return "(unknown)"; }
+
   virtual ~ScanTask() = default;
 
   const std::shared_ptr<ScanOptions>& options() const { return options_; }
@@ -128,6 +130,8 @@ class ARROW_DS_EXPORT InMemoryScanTask : public ScanTask {
 
   Result<RecordBatchIterator> Execute() override;
 
+  std::string name() override { return "(memory)"; };
+
  protected:
   std::vector<std::shared_ptr<RecordBatch>> record_batches_;
 };
diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h
index 729bd1f..5d56bc8 100644
--- a/cpp/src/arrow/dataset/scanner_internal.h
+++ b/cpp/src/arrow/dataset/scanner_internal.h
@@ -92,6 +92,8 @@ class FilterAndProjectScanTask : public ScanTask {
     return ProjectRecordBatch(std::move(filter_it), &projector_, context_->pool);
   }
 
+  std::string name() override { return task_->name(); }
+
  private:
   std::shared_ptr<ScanTask> task_;
   Expression partition_;
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 4e35950..70967dc 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -44,6 +44,7 @@
 #include "arrow/status.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/span.h"
 #include "arrow/util/uri.h"
 
 #include "arrow/flight/internal.h"
@@ -658,12 +659,18 @@ class FlightServiceImpl : public FlightService::Service {
     // Consume data stream and write out payloads
     while (true) {
       FlightPayload payload;
-      SERVICE_RETURN_NOT_OK(flight_context, data_stream->Next(&payload));
-      if (payload.ipc_message.metadata == nullptr ||
-          !internal::WritePayload(payload, writer))
-        // No more messages to write, or connection terminated for some other
-        // reason
-        break;
+      {
+        Span span("DoGet::FlightDataStream::Next");
+        SERVICE_RETURN_NOT_OK(flight_context, data_stream->Next(&payload));
+      }
+      {
+        Span span("DoGet::WritePayload");
+        if (payload.ipc_message.metadata == nullptr ||
+            !internal::WritePayload(payload, writer))
+          // No more messages to write, or connection terminated for some other
+          // reason
+          break;
+      }
     }
     RETURN_WITH_MIDDLEWARE(flight_context, grpc::Status::OK);
   }
diff --git a/cpp/src/arrow/util/span.cc b/cpp/src/arrow/util/span.cc
new file mode 100644
index 0000000..396c178
--- /dev/null
+++ b/cpp/src/arrow/util/span.cc
@@ -0,0 +1,63 @@
+#include "arrow/util/span.h"
+
+#include <chrono>
+#include <cstdint>
+#include <sstream>
+#include <thread>
+#include <unordered_map>
+
+#include "arrow/util/logging.h"
+
+namespace arrow {
+
+int64_t GetNowNs() {
+  std::chrono::time_point<std::chrono::steady_clock, std::chrono::nanoseconds> now(
+      std::chrono::steady_clock::now());
+  return now.time_since_epoch().count();
+}
+
+class Span::Impl {
+ public:
+  explicit Impl(const std::string& name)
+      : name_(name), thread_id_(std::this_thread::get_id()), start_ns_(GetNowNs()) {}
+
+  void AddAttribute(const std::string& key, const std::string& value) {
+    attributes_.insert({key, value});
+  }
+
+  void End() {
+    const auto end_ns = GetNowNs();
+    ARROW_LOG(DEBUG) << "span"
+                     << " name=" << name_ << ";thread_id=" << thread_id_
+                     << ";start=" << start_ns_ << ";end=" << end_ns
+                     << AttributesKeyValueString();
+  }
+
+  std::string AttributesKeyValueString() const {
+    if (attributes_.empty()) {
+      return "";
+    }
+    std::stringstream result;
+    for (const auto& pair : attributes_) {
+      result << ';' << pair.first << '=' << pair.second;
+    }
+    return result.str();
+  }
+
+ private:
+  std::string name_;
+  std::thread::id thread_id_;
+  int64_t start_ns_;
+  std::unordered_map<std::string, std::string> attributes_;
+};
+
+Span::Span(const std::string& name) : impl_(new Impl(name)) {}
+Span::~Span() { End(); }
+
+void Span::AddAttribute(const std::string& key, const std::string& value) {
+  impl_->AddAttribute(key, value);
+}
+
+void Span::End() { impl_->End(); }
+
+}  // namespace arrow
diff --git a/cpp/src/arrow/util/span.h b/cpp/src/arrow/util/span.h
new file mode 100644
index 0000000..57b7be7
--- /dev/null
+++ b/cpp/src/arrow/util/span.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <memory>
+#include <string>
+
+namespace arrow {
+
+class Span final {
+ public:
+  explicit Span(const std::string& name);
+  ~Span();
+
+  void AddAttribute(const std::string& key, const std::string& value);
+  void End();
+
+ private:
+  class Impl;
+
+  std::shared_ptr<Impl> impl_;
+};
+
+}  // namespace arrow


[arrow] 02/02: Add overload for future

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch patching
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit ab15813b037bb880a8a8c6f825dceaf7fad341fd
Author: David Li <da...@ursacomputing.com>
AuthorDate: Mon Feb 22 09:48:40 2021 -0500

    Add overload for future
---
 cpp/src/arrow/util/future.h | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index ee053cf..6a7ed19 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -493,6 +493,17 @@ class ARROW_MUST_USE_TYPE Future {
     });
   }
 
+  Future(Result<ValueType> res) : Future() {
+    if (ARROW_PREDICT_TRUE(res.ok())) {
+      impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
+    } else {
+      impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
+    }
+    SetResult(std::move(res));
+  }
+
+  Future(Status s) : Future(Result<ValueType>(std::move(s))) {}
+
  protected:
   template <typename OnComplete>
   struct Callback {