You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by ha...@apache.org on 2018/03/27 20:30:13 UTC
[incubator-mxnet] branch master updated: Iter (#198) (#10124)
This is an automated email from the ASF dual-hosted git repository.
haibin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git
The following commit(s) were added to refs/heads/master by this push:
new e08e1fd Iter (#198) (#10124)
e08e1fd is described below
commit e08e1fdaa26cb05fac4c152fc662e77130e8db35
Author: Haibin Lin <li...@gmail.com>
AuthorDate: Tue Mar 27 13:30:02 2018 -0700
Iter (#198) (#10124)
* fix a bug in sparse batch loader
* fix warning
* fix bug when size=0
* cache is indptr
---
src/io/iter_sparse_batchloader.h | 178 +++++++++++++++++++++++++--------------
tests/python/unittest/test_io.py | 13 ++-
2 files changed, 123 insertions(+), 68 deletions(-)
diff --git a/src/io/iter_sparse_batchloader.h b/src/io/iter_sparse_batchloader.h
index d5c9bd2..398d6e0 100644
--- a/src/io/iter_sparse_batchloader.h
+++ b/src/io/iter_sparse_batchloader.h
@@ -68,53 +68,36 @@ class SparseBatchLoader : public BatchLoader, public SparseIIterator<TBlobBatch>
// if overflown from previous round, directly return false, until before first is called
if (num_overflow_ != 0) return false;
index_t top = 0;
- inst_cache_.clear();
+ offsets_.clear();
while (sparse_base_->Next()) {
- inst_cache_.emplace_back(sparse_base_->Value());
- if (inst_cache_.size() >= param_.batch_size) break;
- }
- // no more data instance
- if (inst_cache_.size() == 0) {
- return false;
+ const DataInst& inst = sparse_base_->Value();
+ // initialize the data buffer, only called once
+ if (data_.size() == 0) this->InitData(inst);
+ // initialize the number of elements in each buffer, called once per batch
+ if (offsets_.size() == 0) offsets_.resize(inst.data.size(), 0);
+ CopyData(inst, top);
+ if (++top >= param_.batch_size) {
+ SetOutputShape();
+ return true;
+ }
}
- if (inst_cache_.size() < param_.batch_size) {
- CHECK_GT(param_.round_batch, 0);
+ if (top != 0) {
+ CHECK_NE(param_.round_batch, 0)
+ << "round_batch = False is not supported for sparse data iterator";
num_overflow_ = 0;
sparse_base_->BeforeFirst();
- for (; inst_cache_.size() < param_.batch_size; ++num_overflow_) {
+ for (; top < param_.batch_size; ++top, ++num_overflow_) {
CHECK(sparse_base_->Next()) << "number of input must be bigger than batch size";
- inst_cache_.emplace_back(sparse_base_->Value());
- }
- }
- out_.num_batch_padd = num_overflow_;
- CHECK_EQ(inst_cache_.size(), param_.batch_size);
- this->InitDataFromBatch();
- for (size_t j = 0; j < inst_cache_.size(); j++) {
- const auto& d = inst_cache_[j];
- out_.inst_index[top] = d.index;
- // TODO(haibin) double check the type?
- int64_t unit_size = 0;
- for (size_t i = 0; i < d.data.size(); ++i) {
- // indptr tensor
- if (IsIndPtr(i)) {
- auto indptr = data_[i].get<cpu, 1, int64_t>();
- if (j == 0) indptr[0] = 0;
- indptr[j + 1] = indptr[j] + unit_size;
- offsets_[i] = j;
- } else {
- // indices and values tensor
- unit_size = d.data[i].shape_.Size();
- MSHADOW_TYPE_SWITCH(data_[i].type_flag_, DType, {
- const auto begin = offsets_[i];
- const auto end = offsets_[i] + unit_size;
- mshadow::Copy(data_[i].get<cpu, 1, DType>().Slice(begin, end),
- d.data[i].get_with_shape<cpu, 1, DType>(mshadow::Shape1(unit_size)));
- });
- offsets_[i] += unit_size;
- }
+ const DataInst& inst = sparse_base_->Value();
+ // copy data
+ CopyData(inst, top);
}
+ SetOutputShape();
+ out_.num_batch_padd = num_overflow_;
+ return true;
}
- return true;
+ // no more data instance
+ return false;
}
virtual const TBlobBatch &Value(void) const {
@@ -138,14 +121,16 @@ class SparseBatchLoader : public BatchLoader, public SparseIIterator<TBlobBatch>
private:
/*! \brief base sparse iterator */
SparseIIterator<DataInst> *sparse_base_;
- /*! \brief data instances */
- std::vector<DataInst> inst_cache_;
/*! \brief data storage type */
NDArrayStorageType data_stype_;
/*! \brief data label type */
NDArrayStorageType label_stype_;
- /*! \brief tensor offset for slicing */
+ /*! \brief tensor offsets for slicing */
std::vector<size_t> offsets_;
+ /*! \brief tensor dtypes */
+ std::vector<int> dtypes_;
+ /*! \brief whether the offset correspond to an indptr array */
+ std::vector<bool> indptr_;
// check whether ith position is the indptr tensor for a CSR tensor
inline bool IsIndPtr(size_t i) {
@@ -157,44 +142,109 @@ class SparseBatchLoader : public BatchLoader, public SparseIIterator<TBlobBatch>
return true;
}
// label indptr
- if (i == label_indptr_offset && label_stype_ == kCSRStorage && data_stype_ == kCSRStorage) {
+ if (i == label_indptr_offset && label_stype_ == kCSRStorage &&
+ data_stype_ == kCSRStorage) {
return true;
}
return false;
}
// initialize the data holder by using from the batch
- inline void InitDataFromBatch() {
+ inline void InitData(const DataInst& first_inst) {
CHECK(data_stype_ == kCSRStorage || label_stype_ == kCSRStorage);
- CHECK_GT(inst_cache_.size(), 0);
out_.data.clear();
data_.clear();
offsets_.clear();
-
- size_t total_size = inst_cache_[0].data.size();
- data_.resize(total_size);
- offsets_.resize(total_size, 0);
- std::vector<size_t> vec_sizes(total_size, 0);
- // accumulate the memory required for a batch
- for (size_t i = 0; i < total_size; ++i) {
- size_t size = 0;
- // vec_size for indptr
+ indptr_.clear();
+
+ // num_arrays is the number of arrays in inputs
+ // if both data and label are in the csr format,
+ // num_arrays will be 3 + 3 = 6.
+ size_t num_arrays = first_inst.data.size();
+ data_.resize(num_arrays);
+ offsets_.resize(num_arrays, 0);
+ indptr_.resize(num_arrays, false);
+ // tensor buffer sizes
+ std::vector<size_t> buff_sizes(num_arrays, 0);
+ dtypes_.resize(num_arrays);
+ out_.data.resize(num_arrays);
+ // estimate the memory required for a batch
+ for (size_t i = 0; i < num_arrays; ++i) {
+ // shape for indptr
if (IsIndPtr(i)) {
- size = param_.batch_size + 1;
+ buff_sizes[i] = param_.batch_size + 1;
+ indptr_[i] = true;
} else {
- for (const auto &d : inst_cache_) size += d.data[i].shape_.Size();
+ // estimated the size for the whole batch based on the first instance
+ buff_sizes[i] = first_inst.data[i].Size() * param_.batch_size;
+ indptr_[i] = false;
}
- vec_sizes[i] = size;
+ dtypes_[i] = first_inst.data[i].type_flag_;
}
- CHECK_EQ(vec_sizes[0], vec_sizes[1]);
- for (size_t i = 0; i < total_size; ++i) {
- int src_type_flag = inst_cache_[0].data[i].type_flag_;
+ CHECK_EQ(buff_sizes[0], buff_sizes[1]);
+ // allocate buffer
+ for (size_t i = 0; i < num_arrays; ++i) {
// init object attributes
- TShape dst_shape(mshadow::Shape1(vec_sizes[i]));
- data_[i].resize(mshadow::Shape1(vec_sizes[i]), src_type_flag);
+ TShape dst_shape(mshadow::Shape1(buff_sizes[i]));
+ data_[i].resize(mshadow::Shape1(buff_sizes[i]), dtypes_[i]);
CHECK(data_[i].dptr_ != nullptr);
- out_.data.push_back(TBlob(data_[i].dptr_, dst_shape, cpu::kDevMask, src_type_flag));
+ }
+ }
+
+ /* \brief set the shape of the outputs based on actual shapes */
+ inline void SetOutputShape() {
+ for (size_t i = 0; i < out_.data.size(); i++) {
+ out_.data[i] = TBlob(data_[i].dptr_, mshadow::Shape1(offsets_[i]),
+ Context::kCPU, dtypes_[i]);
+ }
+ }
+
+ /* \brief increase the size of i-th data buffer by a factor of 2, while retaining the content */
+ inline void ResizeBuffer(size_t src_size, size_t i) {
+ MSHADOW_TYPE_SWITCH(data_[i].type_flag_, DType, {
+ TBlobContainer temp;
+ temp.resize(mshadow::Shape1(src_size), dtypes_[i]);
+ mshadow::Copy(temp.get<cpu, 1, DType>(), data_[i].get<cpu, 1, DType>().Slice(0, src_size));
+ // increase the size of space exponentially
+ size_t capacity = data_[i].Size();
+ capacity = capacity * 2 + 1;
+ data_[i] = TBlobContainer();
+ data_[i].resize(mshadow::Shape1(capacity), dtypes_[i]);
+ // copy back
+ mshadow::Copy(data_[i].get<cpu, 1, DType>().Slice(0, src_size), temp.get<cpu, 1, DType>());
+ });
+ }
+
+ /* \brief copy the data instance to data buffer */
+ void CopyData(const DataInst& inst, const size_t top) {
+ int64_t unit_size = 0;
+ out_.inst_index[top] = inst.index;
+ for (size_t i = 0; i < inst.data.size(); ++i) {
+ if (!indptr_[i]) {
+ // indices and values tensor
+ unit_size = inst.data[i].shape_.Size();
+ MSHADOW_TYPE_SWITCH(data_[i].type_flag_, DType, {
+ const size_t begin = offsets_[i];
+ const size_t end = offsets_[i] + unit_size;
+ size_t capacity = data_[i].Size();
+ // resize the data buffer if estimated space is not sufficient
+ while (capacity < end) {
+ ResizeBuffer(begin, i);
+ capacity = data_[i].Size();
+ }
+ mshadow::Copy(data_[i].get<cpu, 1, DType>().Slice(begin, end),
+ inst.data[i].get_with_shape<cpu, 1, DType>(mshadow::Shape1(unit_size)));
+ });
+ offsets_[i] += unit_size;
+ } else {
+ // indptr placeholder
+ auto indptr = data_[i].get<cpu, 1, int64_t>();
+ // initialize the first indptr, which is always 0
+ if (top == 0) indptr[0] = 0;
+ indptr[top + 1] = indptr[top] + unit_size;
+ offsets_[i] = top + 2;
+ }
}
}
}; // class BatchLoader
diff --git a/tests/python/unittest/test_io.py b/tests/python/unittest/test_io.py
index 4e23a22..e986ae7 100644
--- a/tests/python/unittest/test_io.py
+++ b/tests/python/unittest/test_io.py
@@ -219,7 +219,9 @@ def test_LibSVMIter():
i = 0
for batch in iter(data_train):
expected = first.asnumpy() if i == 0 else second.asnumpy()
- assert_almost_equal(data_train.getdata().asnumpy(), expected)
+ data = data_train.getdata()
+ data.check_format(True)
+ assert_almost_equal(data.asnumpy(), expected)
i += 1
def check_libSVMIter_news_data():
@@ -227,7 +229,7 @@ def test_LibSVMIter():
'name': 'news20.t',
'origin_name': 'news20.t.bz2',
'url': "https://apache-mxnet.s3-accelerate.dualstack.amazonaws.com/gluon/dataset/news20.t.bz2",
- 'feature_dim': 62060,
+ 'feature_dim': 62060 + 1,
'num_classes': 20,
'num_examples': 3993,
}
@@ -243,8 +245,11 @@ def test_LibSVMIter():
num_batches = 0
for batch in data_train:
# check the range of labels
- assert(np.sum(batch.label[0].asnumpy() > 20) == 0)
- assert(np.sum(batch.label[0].asnumpy() <= 0) == 0)
+ data = batch.data[0]
+ label = batch.label[0]
+ data.check_format(True)
+ assert(np.sum(label.asnumpy() > 20) == 0)
+ assert(np.sum(label.asnumpy() <= 0) == 0)
num_batches += 1
expected_num_batches = num_examples / batch_size
assert(num_batches == int(expected_num_batches)), num_batches
--
To stop receiving notification emails like this one, please contact
haibin@apache.org.