You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by GitBox <gi...@apache.org> on 2017/12/05 05:36:24 UTC

[GitHub] eric-haibin-lin closed pull request #8922: fix a bug in sparse batch loader when batch size is extremely large

eric-haibin-lin closed pull request #8922: fix a bug in sparse batch loader when batch size is extremely large
URL: https://github.com/apache/incubator-mxnet/pull/8922
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/io/iter_sparse_batchloader.h b/src/io/iter_sparse_batchloader.h
index d5c9bd2f45..e5e9c1fe38 100644
--- a/src/io/iter_sparse_batchloader.h
+++ b/src/io/iter_sparse_batchloader.h
@@ -68,53 +68,36 @@ class SparseBatchLoader : public BatchLoader, public SparseIIterator<TBlobBatch>
     // if overflown from previous round, directly return false, until before first is called
     if (num_overflow_ != 0) return false;
     index_t top = 0;
-    inst_cache_.clear();
+    offsets_.clear();
     while (sparse_base_->Next()) {
-      inst_cache_.emplace_back(sparse_base_->Value());
-      if (inst_cache_.size() >= param_.batch_size) break;
-    }
-    // no more data instance
-    if (inst_cache_.size() == 0) {
-      return false;
+      const DataInst& inst = sparse_base_->Value();
+      // initialize the data buffer, only called once
+      if (data_.size() == 0) this->InitData(inst);
+      // initialize the number of elements in each buffer, called once per batch
+      if (offsets_.size() == 0) offsets_.resize(inst.data.size(), 0);
+      CopyData(inst, top);
+      if (++top >= param_.batch_size) {
+        SetOutputShape();
+        return true;
+      }
     }
-    if (inst_cache_.size() < param_.batch_size) {
-      CHECK_GT(param_.round_batch, 0);
+    if (top != 0) {
+      CHECK_NE(param_.round_batch, 0)
+        << "round_batch = False is not supported for sparse data iterator";
       num_overflow_ = 0;
       sparse_base_->BeforeFirst();
-      for (; inst_cache_.size() < param_.batch_size; ++num_overflow_) {
+      for (; top < param_.batch_size; ++top, ++num_overflow_) {
         CHECK(sparse_base_->Next()) << "number of input must be bigger than batch size";
-        inst_cache_.emplace_back(sparse_base_->Value());
-      }
-    }
-    out_.num_batch_padd = num_overflow_;
-    CHECK_EQ(inst_cache_.size(), param_.batch_size);
-    this->InitDataFromBatch();
-    for (size_t j = 0; j < inst_cache_.size(); j++) {
-      const auto& d = inst_cache_[j];
-      out_.inst_index[top] = d.index;
-      // TODO(haibin) double check the type?
-      int64_t unit_size = 0;
-      for (size_t i = 0; i < d.data.size(); ++i) {
-        // indptr tensor
-        if (IsIndPtr(i)) {
-          auto indptr = data_[i].get<cpu, 1, int64_t>();
-          if (j == 0) indptr[0] = 0;
-          indptr[j + 1] = indptr[j] + unit_size;
-          offsets_[i] = j;
-        } else {
-          // indices and values tensor
-          unit_size = d.data[i].shape_.Size();
-          MSHADOW_TYPE_SWITCH(data_[i].type_flag_, DType, {
-            const auto begin = offsets_[i];
-            const auto end = offsets_[i] + unit_size;
-            mshadow::Copy(data_[i].get<cpu, 1, DType>().Slice(begin, end),
-                          d.data[i].get_with_shape<cpu, 1, DType>(mshadow::Shape1(unit_size)));
-            });
-          offsets_[i] += unit_size;
-        }
+        const DataInst& inst = sparse_base_->Value();
+        // copy data
+        CopyData(inst, top);
       }
+      SetOutputShape();
+      out_.num_batch_padd = num_overflow_;
+      return true;
     }
-    return true;
+    // no more data instance
+    return false;
   }
 
   virtual const TBlobBatch &Value(void) const {
@@ -138,14 +121,14 @@ class SparseBatchLoader : public BatchLoader, public SparseIIterator<TBlobBatch>
  private:
   /*! \brief base sparse iterator */
   SparseIIterator<DataInst> *sparse_base_;
-  /*! \brief data instances */
-  std::vector<DataInst> inst_cache_;
   /*! \brief data storage type */
   NDArrayStorageType data_stype_;
   /*! \brief data label type */
   NDArrayStorageType label_stype_;
-  /*! \brief tensor offset for slicing */
+  /*! \brief tensor offsets for slicing */
   std::vector<size_t> offsets_;
+  /*! \brief tensor dtypes */
+  std::vector<int> dtypes_;
 
   // check whether ith position is the indptr tensor for a CSR tensor
   inline bool IsIndPtr(size_t i) {
@@ -157,44 +140,100 @@ class SparseBatchLoader : public BatchLoader, public SparseIIterator<TBlobBatch>
       return true;
     }
     // label indptr
-    if (i == label_indptr_offset && label_stype_ == kCSRStorage && data_stype_ == kCSRStorage) {
+    if (i == label_indptr_offset && label_stype_ == kCSRStorage &&
+        data_stype_ == kCSRStorage) {
       return true;
     }
     return false;
   }
 
   // initialize the data holder by using from the batch
-  inline void InitDataFromBatch() {
+  inline void InitData(const DataInst& first_batch) {
     CHECK(data_stype_ == kCSRStorage || label_stype_ == kCSRStorage);
-    CHECK_GT(inst_cache_.size(), 0);
     out_.data.clear();
     data_.clear();
     offsets_.clear();
 
-    size_t total_size = inst_cache_[0].data.size();
+    size_t total_size = first_batch.data.size();
     data_.resize(total_size);
     offsets_.resize(total_size, 0);
-    std::vector<size_t> vec_sizes(total_size, 0);
-    // accumulate the memory required for a batch
+    // tensor buffer sizes
+    std::vector<size_t> buff_sizes(total_size, 0);
+    dtypes_.resize(total_size);
+    out_.data.resize(total_size);
+    // estimate the memory required for a batch
     for (size_t i = 0; i < total_size; ++i) {
       size_t size = 0;
-      // vec_size for indptr
+      // shape for indptr
       if (IsIndPtr(i)) {
-        size = param_.batch_size + 1;
+        buff_sizes[i] = param_.batch_size + 1;
       } else {
-        for (const auto &d : inst_cache_) size += d.data[i].shape_.Size();
+        // estimated the size for the whole batch based on the first instance
+        buff_sizes[i] = first_batch.data[i].Size() * param_.batch_size;
       }
-      vec_sizes[i] = size;
+      dtypes_[i] = first_batch.data[i].type_flag_;
     }
 
-    CHECK_EQ(vec_sizes[0], vec_sizes[1]);
+    CHECK_EQ(buff_sizes[0], buff_sizes[1]);
+    // allocate buffer
     for (size_t i = 0; i < total_size; ++i) {
-      int src_type_flag = inst_cache_[0].data[i].type_flag_;
       // init object attributes
-      TShape dst_shape(mshadow::Shape1(vec_sizes[i]));
-      data_[i].resize(mshadow::Shape1(vec_sizes[i]), src_type_flag);
+      TShape dst_shape(mshadow::Shape1(buff_sizes[i]));
+      data_[i].resize(mshadow::Shape1(buff_sizes[i]), dtypes_[i]);
       CHECK(data_[i].dptr_ != nullptr);
-      out_.data.push_back(TBlob(data_[i].dptr_, dst_shape, cpu::kDevMask, src_type_flag));
+    }
+  }
+
+  /* \brief set the shape of the outputs based on actual shapes */
+  inline void SetOutputShape() {
+    for (size_t i = 0; i < out_.data.size(); i++) {
+      out_.data[i] = TBlob(data_[i].dptr_, mshadow::Shape1(offsets_[i]),
+                           Context::kCPU, dtypes_[i]);
+    }
+  }
+
+  /* \brief increase the size of i-th data buffer by a factor of 2, while retaining the content */
+  inline void ResizeBuffer(size_t src_size, size_t i) {
+    MSHADOW_TYPE_SWITCH(data_[i].type_flag_, DType, {
+      TBlobContainer temp;
+      temp.resize(mshadow::Shape1(src_size), dtypes_[i]);
+      mshadow::Copy(temp.get<cpu, 1, DType>(), data_[i].get<cpu, 1, DType>().Slice(0, src_size));
+      // increase the size of space exponentially
+      size_t capacity = data_[i].Size();
+      capacity *= 2;
+      data_[i] = TBlobContainer();
+      data_[i].resize(mshadow::Shape1(capacity), dtypes_[i]);
+      // copy back
+      mshadow::Copy(data_[i].get<cpu, 1, DType>().Slice(0, src_size), temp.get<cpu, 1, DType>());
+    });
+  }
+
+  /* \brief copy the data instance to data buffer */
+  void CopyData(const DataInst& d, const size_t top) {
+    int64_t unit_size = 0;
+    out_.inst_index[top] = d.index;
+    for (size_t i = 0; i < d.data.size(); ++i) {
+      if (!IsIndPtr(i)) {
+        // indices and values tensor
+        unit_size = d.data[i].shape_.Size();
+        MSHADOW_TYPE_SWITCH(data_[i].type_flag_, DType, {
+          const size_t begin = offsets_[i];
+          const size_t end = offsets_[i] + unit_size;
+          const size_t capacity = data_[i].Size();
+          // resize the data buffer if estimated space is not sufficient
+          if (capacity < end) ResizeBuffer(begin, i);
+          mshadow::Copy(data_[i].get<cpu, 1, DType>().Slice(begin, end),
+                        d.data[i].get_with_shape<cpu, 1, DType>(mshadow::Shape1(unit_size)));
+        });
+        offsets_[i] += unit_size;
+      } else {
+        // indptr placeholder
+        auto indptr = data_[i].get<cpu, 1, int64_t>();
+        // initialize the first indptr, which is always 0
+        if (top == 0) indptr[0] = 0;
+        indptr[top + 1] = indptr[top] + unit_size;
+        offsets_[i] = top + 2;
+      }
     }
   }
 };  // class BatchLoader
diff --git a/tests/python/unittest/test_io.py b/tests/python/unittest/test_io.py
index fa314e0f8b..50ea904f29 100644
--- a/tests/python/unittest/test_io.py
+++ b/tests/python/unittest/test_io.py
@@ -214,7 +214,9 @@ def check_libSVMIter_synthetic():
         i = 0
         for batch in iter(data_train):
             expected = first.asnumpy() if i == 0 else second.asnumpy()
-            assert_almost_equal(data_train.getdata().asnumpy(), expected)
+            data = data_train.getdata()
+            data.check_format(True)
+            assert_almost_equal(data.asnumpy(), expected)
             i += 1
 
     def check_libSVMIter_news_data():
@@ -222,7 +224,7 @@ def check_libSVMIter_news_data():
             'name': 'news20.t',
             'origin_name': 'news20.t.bz2',
             'url': "http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/news20.t.bz2",
-            'feature_dim': 62060,
+            'feature_dim': 62060 + 1,
             'num_classes': 20,
             'num_examples': 3993,
         }
@@ -238,8 +240,11 @@ def check_libSVMIter_news_data():
             num_batches = 0
             for batch in data_train:
                 # check the range of labels
-                assert(np.sum(batch.label[0].asnumpy() > 20) == 0)
-                assert(np.sum(batch.label[0].asnumpy() <= 0) == 0)
+                data = batch.data[0]
+                label = batch.label[0]
+                data.check_format(True)
+                assert(np.sum(label.asnumpy() > 20) == 0)
+                assert(np.sum(label.asnumpy() <= 0) == 0)
                 num_batches += 1
             expected_num_batches = num_examples / batch_size
             assert(num_batches == int(expected_num_batches)), num_batches


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services