You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/17 02:37:40 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #11912: ARROW-14445: [C++] Memory resources management

westonpace commented on a change in pull request #11912:
URL: https://github.com/apache/arrow/pull/11912#discussion_r771058164



##########
File path: cpp/src/arrow/compute/memory_resources.cc
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/memory_resources.h"
+
+#include <array>
+#include <memory>
+#include <mutex>
+#include <random>
+
+#include "arrow/compute/exec.h"
+#include "arrow/util/logging.h"
+
+#ifdef __APPLE__
+#include <sys/sysctl.h>
+#include <sys/types.h>
+#endif
+
+#ifdef __linux__
+#include <sys/statvfs.h>
+#include <sys/sysinfo.h>
+#endif
+
+// Windows APIs
+#include "arrow/util/windows_compatibility.h"
+
+namespace arrow {
+
+namespace compute {
+
+std::string MemoryLevelName(MemoryLevel memory_level) {
+  static const char* MemoryLevelNames[] = {ARROW_STRINGIFY(MemoryLevel::kDiskLevel),
+                                           ARROW_STRINGIFY(MemoryLevel::kCpuLevel),
+                                           ARROW_STRINGIFY(MemoryLevel::kGpuLevel)};
+
+  return MemoryLevelNames[static_cast<int>(memory_level)];
+}
+
+std::string MemoryResource::ToString() const { return MemoryLevelName(memory_level_); }
+
+MemoryResources::~MemoryResources() {}
+
+std::unique_ptr<MemoryResources> MemoryResources::Make() {
+  return std::unique_ptr<MemoryResources>(new MemoryResources());
+}
+
+Status MemoryResources::AddMemoryResource(std::shared_ptr<MemoryResource> resource) {
+  auto level = static_cast<size_t>(resource->memory_level());
+  if (stats_[level] != nullptr) {
+    return Status::KeyError("Already have a resource type registered with name: ",
+                            resource->ToString());
+  }
+  stats_[level] = std::move(resource);
+  return Status::OK();
+}
+
+size_t MemoryResources::size() const { return stats_.size(); }
+
+Result<MemoryResource*> MemoryResources::memory_resource(MemoryLevel memory_level) const {
+  auto level = static_cast<size_t>(memory_level);
+  if (stats_[level] == nullptr) {
+    return Status::KeyError("No memory resource registered with level: ",
+                            MemoryLevelName(memory_level));
+  }
+  return stats_[level].get();
+}
+
+std::vector<MemoryResource*> MemoryResources::memory_resources() const {
+  std::vector<MemoryResource*> arr;
+  for (auto&& resource : stats_) {
+    if (resource != nullptr) {
+      arr.push_back(resource.get());
+    }
+  }
+  return arr;
+}
+
+namespace {
+
+size_t GetTotalMemorySize() {
+#ifdef __APPLE__
+  int mib[2];
+  size_t physical_memory;
+  size_t length;
+  // Get the Physical memory size
+  mib[0] = CTL_HW;
+  mib[1] = HW_MEMSIZE;
+  length = sizeof(size_t);
+  sysctl(mib, 2, &physical_memory, &length, NULL, 0);
+  return physical_memory;
+#elif defined(_WIN32)
+  MEMORYSTATUSEX status;
+  status.dwLength = sizeof(status);
+  GlobalMemoryStatusEx(&status);
+  return status.ullTotalPhys;
+#else  // Linux
+  struct sysinfo si;
+  sysinfo(&si);
+  return (size_t)si.freeram;
+#endif
+}
+
+struct CPUMemoryResource : public MemoryResource {
+  explicit CPUMemoryResource(arrow::MemoryPool* pool, float memory_limit_threshold = 0.75)
+      : MemoryResource(MemoryLevel::kCpuLevel), pool_(pool) {
+    total_memory_size_ = GetTotalMemorySize();
+    memory_limit_ =
+        static_cast<int64_t>(std::round(memory_limit_threshold * total_memory_size_));
+  }
+
+  int64_t memory_used() override { return pool_->bytes_allocated(); }
+
+  int64_t memory_limit() override { return memory_limit_; }
+
+ private:
+  arrow::MemoryPool* pool_;
+  int64_t memory_limit_;
+  int64_t total_memory_size_;
+};
+
+static std::unique_ptr<MemoryResources> CreateBuiltInMemoryResources(MemoryPool* pool) {
+  auto resources = MemoryResources::Make();
+
+  // CPU MemoryLevel
+  auto cpu_level = std::make_shared<CPUMemoryResource>(pool);
+  DCHECK_OK(resources->AddMemoryResource(std::move(cpu_level)));
+
+  // Disk MemoryLevel ...
+
+  return resources;
+}
+
+}  // namespace
+
+MemoryResources* GetMemoryResources(MemoryPool* pool) {
+  static auto resources = CreateBuiltInMemoryResources(pool);
+  return resources.get();
+}

Review comment:
       In the other places where we have this "process-wide default" we use a singleton pattern.  Do we need a process-wide default here?  If so, should we use the singleton pattern?
   
   If we don't want a process-wide default then what is the difference between this method and `MemoryResources::Make`?

##########
File path: cpp/src/arrow/compute/memory_resources.h
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <array>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+namespace compute {
+
+struct ExecBatch;
+
+enum class MemoryLevel : int { kGpuLevel, kCpuLevel, kDiskLevel, kNumLevels };
+
+class ARROW_EXPORT MemoryResource {

Review comment:
       The name `MemoryResource` seems a little off to me since "disk" isn't really "memory".  Can we name it a `StorageResource` or something like that?

##########
File path: cpp/src/arrow/compute/memory_resources.h
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <array>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+namespace compute {
+
+struct ExecBatch;
+
+enum class MemoryLevel : int { kGpuLevel, kCpuLevel, kDiskLevel, kNumLevels };

Review comment:
       `kGpuLevel` and `kDiskLevel` are not used anywhere.  Also, at the moment, I think `kGpuLevel` is going to provide more confusion than explanation.  Can we leave it out until we are using the GPU somewhere?  I think leaving `kDiskLevel` in is ok, but maybe add a comment with a TODO to a follow-up PR.  Something like "Limit the max size of temporary spillover storage".

##########
File path: cpp/src/arrow/compute/memory_resources.h
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <array>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+namespace compute {
+
+struct ExecBatch;
+
+enum class MemoryLevel : int { kGpuLevel, kCpuLevel, kDiskLevel, kNumLevels };
+
+class ARROW_EXPORT MemoryResource {
+ public:
+  explicit MemoryResource(MemoryLevel memory_level) : memory_level_(memory_level) {}
+
+  virtual ~MemoryResource() = default;
+
+  MemoryLevel memory_level() const { return memory_level_; }
+
+  std::string ToString() const;
+
+  virtual int64_t memory_limit() = 0;
+
+  virtual int64_t memory_used() = 0;
+
+ private:
+  MemoryLevel memory_level_;
+};
+
+class ARROW_EXPORT MemoryResources {
+ public:
+  ~MemoryResources();
+
+  static std::unique_ptr<MemoryResources> Make();
+
+  Status AddMemoryResource(std::shared_ptr<MemoryResource> resource);
+
+  size_t size() const;
+
+  Result<MemoryResource*> memory_resource(MemoryLevel level) const;

Review comment:
       Rather than expose the underlying `MemoryResource` could we just move `memory_used()` and `memory_limit()` to this class?  Then `MemoryResource` can be an implementation detail.

##########
File path: cpp/src/arrow/compute/memory_resources.h
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <array>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+namespace compute {
+
+struct ExecBatch;
+
+enum class MemoryLevel : int { kGpuLevel, kCpuLevel, kDiskLevel, kNumLevels };
+
+class ARROW_EXPORT MemoryResource {
+ public:
+  explicit MemoryResource(MemoryLevel memory_level) : memory_level_(memory_level) {}
+
+  virtual ~MemoryResource() = default;
+
+  MemoryLevel memory_level() const { return memory_level_; }
+
+  std::string ToString() const;
+
+  virtual int64_t memory_limit() = 0;
+
+  virtual int64_t memory_used() = 0;
+
+ private:
+  MemoryLevel memory_level_;
+};
+
+class ARROW_EXPORT MemoryResources {
+ public:
+  ~MemoryResources();
+
+  static std::unique_ptr<MemoryResources> Make();
+
+  Status AddMemoryResource(std::shared_ptr<MemoryResource> resource);
+
+  size_t size() const;
+
+  Result<MemoryResource*> memory_resource(MemoryLevel level) const;
+
+  std::vector<MemoryResource*> memory_resources() const;

Review comment:
       How do you envision this method being used?

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -277,6 +277,26 @@ struct OrderBySinkNode final : public SinkNode {
   void InputReceived(ExecNode* input, ExecBatch batch) override {
     DCHECK_EQ(input, inputs_[0]);
 
+    auto resources = plan()->exec_context()->memory_resources();
+    auto resource = resources->memory_resource(MemoryLevel::kCpuLevel);
+    if (ErrorIfNotOk(resource.status())) {
+      StopProducing();
+      if (input_counter_.Cancel()) {
+        finished_.MarkFinished(resource.status());
+      }
+      return;
+    }
+
+    auto memory_resource = resource.ValueUnsafe();
+    auto memory_used = memory_resource->memory_used();
+    if (memory_used >= memory_resource->memory_limit()) {
+      StopProducing();
+      if (input_counter_.Cancel()) {
+        finished_.MarkFinished(Status::Invalid("Not enough memory resources"));
+      }
+      return;
+    }

Review comment:
       This seems like a lot of code that could potentially get repeated in a lot of places.  I suppose eventually this kind of logic will be moving into a scheduler?

##########
File path: cpp/src/arrow/compute/memory_resources.h
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <array>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+namespace compute {
+
+struct ExecBatch;
+
+enum class MemoryLevel : int { kGpuLevel, kCpuLevel, kDiskLevel, kNumLevels };
+
+class ARROW_EXPORT MemoryResource {
+ public:
+  explicit MemoryResource(MemoryLevel memory_level) : memory_level_(memory_level) {}
+
+  virtual ~MemoryResource() = default;
+
+  MemoryLevel memory_level() const { return memory_level_; }
+
+  std::string ToString() const;
+
+  virtual int64_t memory_limit() = 0;
+
+  virtual int64_t memory_used() = 0;
+
+ private:
+  MemoryLevel memory_level_;
+};
+
+class ARROW_EXPORT MemoryResources {
+ public:
+  ~MemoryResources();
+
+  static std::unique_ptr<MemoryResources> Make();
+
+  Status AddMemoryResource(std::shared_ptr<MemoryResource> resource);

Review comment:
       Can `MemoryResources` be immutable?  Perhaps we can pass in a vector of `MemoryResource` at construction time.  Is there a valid use case for modifying it after construction?

##########
File path: cpp/src/arrow/compute/memory_resources.cc
##########
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/memory_resources.h"
+
+#include <array>
+#include <memory>
+#include <mutex>
+#include <random>
+
+#include "arrow/compute/exec.h"
+#include "arrow/util/logging.h"
+
+#ifdef __APPLE__
+#include <sys/sysctl.h>
+#include <sys/types.h>
+#endif
+
+#ifdef __linux__
+#include <sys/statvfs.h>
+#include <sys/sysinfo.h>
+#endif
+
+// Windows APIs
+#include "arrow/util/windows_compatibility.h"
+
+namespace arrow {
+
+namespace compute {
+
+std::string MemoryLevelName(MemoryLevel memory_level) {
+  static const char* MemoryLevelNames[] = {ARROW_STRINGIFY(MemoryLevel::kDiskLevel),
+                                           ARROW_STRINGIFY(MemoryLevel::kCpuLevel),
+                                           ARROW_STRINGIFY(MemoryLevel::kGpuLevel)};
+
+  return MemoryLevelNames[static_cast<int>(memory_level)];
+}
+
+std::string MemoryResource::ToString() const { return MemoryLevelName(memory_level_); }
+
+MemoryResources::~MemoryResources() {}
+
+std::unique_ptr<MemoryResources> MemoryResources::Make() {
+  return std::unique_ptr<MemoryResources>(new MemoryResources());
+}
+
+Status MemoryResources::AddMemoryResource(std::shared_ptr<MemoryResource> resource) {
+  auto level = static_cast<size_t>(resource->memory_level());
+  if (stats_[level] != nullptr) {
+    return Status::KeyError("Already have a resource type registered with name: ",
+                            resource->ToString());
+  }
+  stats_[level] = std::move(resource);
+  return Status::OK();
+}
+
+size_t MemoryResources::size() const { return stats_.size(); }
+
+Result<MemoryResource*> MemoryResources::memory_resource(MemoryLevel memory_level) const {
+  auto level = static_cast<size_t>(memory_level);
+  if (stats_[level] == nullptr) {
+    return Status::KeyError("No memory resource registered with level: ",
+                            MemoryLevelName(memory_level));
+  }
+  return stats_[level].get();
+}
+
+std::vector<MemoryResource*> MemoryResources::memory_resources() const {
+  std::vector<MemoryResource*> arr;
+  for (auto&& resource : stats_) {
+    if (resource != nullptr) {
+      arr.push_back(resource.get());
+    }
+  }
+  return arr;
+}
+
+namespace {
+
+size_t GetTotalMemorySize() {

Review comment:
       Can you add a comment explaining what is being returned here?  Does this number include swap for example?  Or is it based on physical RAM.  Are there any caveats to be aware of?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org