You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@doris.apache.org by GitBox <gi...@apache.org> on 2019/04/12 01:54:07 UTC

[GitHub] [incubator-doris] kangpinghuang commented on a change in pull request #909: Fix wrong version set in TabletMeta and rowset null pointer

kangpinghuang commented on a change in pull request #909: Fix wrong version set in TabletMeta and rowset null pointer
URL: https://github.com/apache/incubator-doris/pull/909#discussion_r274733949
 
 

 ##########
 File path: be/src/olap/push_handler.cpp
 ##########
 @@ -241,175 +241,174 @@ void PushHandler::_get_tablet_infos(const vector<TabletVars>& tablet_vars,
   }
 }
 
-OLAPStatus PushHandler::_convert(TabletSharedPtr curr_tablet,
+OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet,
                                  TabletSharedPtr new_tablet,
                                  RowsetSharedPtr* cur_rowset,
                                  RowsetSharedPtr* new_rowset) {
-  OLAPStatus res = OLAP_SUCCESS;
-  RowCursor row;
-  BinaryFile raw_file;
-  IBinaryReader* reader = NULL;
-  RowsetWriterSharedPtr rowset_writer(new AlphaRowsetWriter());
-  if (rowset_writer == nullptr) {
-    LOG(WARNING) << "new rowset writer failed.";
-    return OLAP_ERR_MALLOC_ERROR;
-  }
-  RowsetWriterContext context;
-  uint32_t num_rows = 0;
-  RowsetId rowset_id = 0;
-  res = curr_tablet->next_rowset_id(&rowset_id);
-  if (res != OLAP_SUCCESS) {
-    LOG(WARNING) << "generate rowset id failed, res:" << res;
-    return OLAP_ERR_ROWSET_GENERATE_ID_FAILED;
-  }
-  PUniqueId load_id;
-  load_id.set_hi(0);
-  load_id.set_lo(0);
-
-  do {
-    VLOG(3) << "start to convert delta file.";
-
-    // 1. Init BinaryReader to read raw file if exist,
-    //    in case of empty push and delete data, this will be skipped.
-    if (_request.__isset.http_file_path) {
-      // open raw file
-      if (OLAP_SUCCESS !=
-          (res = raw_file.init(_request.http_file_path.c_str()))) {
-        OLAP_LOG_WARNING("failed to read raw file. [res=%d file='%s']", res,
-                         _request.http_file_path.c_str());
-        res = OLAP_ERR_INPUT_PARAMETER_ERROR;
-        break;
-      }
-
-      // create BinaryReader
-      bool need_decompress = false;
-      if (_request.__isset.need_decompress && _request.need_decompress) {
-        need_decompress = true;
-      }
-      if (NULL == (reader = IBinaryReader::create(need_decompress))) {
-        OLAP_LOG_WARNING("fail to create reader. [tablet='%s' file='%s']",
-                         curr_tablet->full_name().c_str(),
-                         _request.http_file_path.c_str());
-        res = OLAP_ERR_MALLOC_ERROR;
-        break;
-      }
-
-      // init BinaryReader
-      if (OLAP_SUCCESS != (res = reader->init(curr_tablet, &raw_file))) {
-        OLAP_LOG_WARNING("fail to init reader. [res=%d tablet='%s' file='%s']",
-                         res, curr_tablet->full_name().c_str(),
-                         _request.http_file_path.c_str());
-        res = OLAP_ERR_PUSH_INIT_ERROR;
-        break;
-      }
+    OLAPStatus res = OLAP_SUCCESS;
+    RowCursor row;
+    BinaryFile raw_file;
+    IBinaryReader* reader = NULL;
+    RowsetWriterSharedPtr rowset_writer(new AlphaRowsetWriter());
+    if (rowset_writer == nullptr) {
+        LOG(WARNING) << "new rowset writer failed.";
+        return OLAP_ERR_MALLOC_ERROR;
     }
-
-    // 2. init RowsetBuilder of cur_tablet for current push
-    VLOG(3) << "init RowsetBuilder.";
     RowsetWriterContext context;
-    context.rowset_id = rowset_id;
-    context.tablet_id = curr_tablet->tablet_id();
-    context.partition_id = _request.partition_id;
-    context.tablet_schema_hash = curr_tablet->schema_hash();
-    context.rowset_type = ALPHA_ROWSET;
-    context.rowset_path_prefix = curr_tablet->tablet_path();
-    context.tablet_schema = &(curr_tablet->tablet_schema());
-    context.rowset_state = PREPARED;
-    context.data_dir = curr_tablet->data_dir();
-    context.txn_id = _request.transaction_id;
-    context.load_id = load_id;
-    rowset_writer->init(context);
-
-    // 3. New RowsetBuilder to write data into rowset
-    VLOG(3) << "init rowset builder. tablet=" << curr_tablet->full_name()
-            << ", block_row_size=" << curr_tablet->num_rows_per_row_block();
-
-    // 4. Init RowCursor
-    if (OLAP_SUCCESS != (res = row.init(curr_tablet->tablet_schema()))) {
-      LOG(WARNING) << "fail to init rowcursor. res=" << res;
-      break;
+    uint32_t num_rows = 0;
+    RowsetId rowset_id = 0;
+    res = cur_tablet->next_rowset_id(&rowset_id);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "generate rowset id failed, res:" << res;
+        return OLAP_ERR_ROWSET_GENERATE_ID_FAILED;
     }
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+
+    do {
+        VLOG(3) << "start to convert delta file.";
+
+        // 1. Init BinaryReader to read raw file if exist,
+        //    in case of empty push and delete data, this will be skipped.
+        if (_request.__isset.http_file_path) {
+            // open raw file
+            if (OLAP_SUCCESS !=
+                    (res = raw_file.init(_request.http_file_path.c_str()))) {
+                LOG(WARNING) << "failed to read raw file. res=" << res
+                             << ", file=" << _request.http_file_path;
+                res = OLAP_ERR_INPUT_PARAMETER_ERROR;
+                break;
+            }
+
+            // create BinaryReader
+            bool need_decompress = false;
+            if (_request.__isset.need_decompress && _request.need_decompress) {
+                need_decompress = true;
+            }
+            if (NULL == (reader = IBinaryReader::create(need_decompress))) {
+                LOG(WARNING) << "fail to create reader. tablet=" << cur_tablet->full_name()
+                             << ", file=" << _request.http_file_path;
+                res = OLAP_ERR_MALLOC_ERROR;
+                break;
+            }
+
+            // init BinaryReader
+            if (OLAP_SUCCESS != (res = reader->init(cur_tablet, &raw_file))) {
+                LOG(WARNING) << "fail to init reader. res=" << res
+                             << ", tablet=" << cur_tablet->full_name()
+                             << ", file=" << _request.http_file_path;
+                res = OLAP_ERR_PUSH_INIT_ERROR;
+                break;
+            }
+        }
 
-    // 5. Read data from raw file and write into SegmentGroup of curr_tablet
-    if (_request.__isset.http_file_path) {
-      // Convert from raw to delta
-      VLOG(3) << "start to convert row file to delta.";
-      while (!reader->eof()) {
-        res = reader->next(&row, rowset_writer->mem_pool());
-        if (OLAP_SUCCESS != res) {
-          LOG(WARNING) << "read next row failed."
-                       << " res=" << res << " read_rows=" << num_rows;
-          break;
-        } else {
-          if (OLAP_SUCCESS != (res = rowset_writer->add_row(&row))) {
-            LOG(WARNING) << "fail to attach row to rowset_writer. "
-                         << " res=" << res
-                         << ", tablet=" << curr_tablet->full_name()
-                         << " read_rows=" << num_rows;
+        // 2. init RowsetBuilder of cur_tablet for current push
+        VLOG(3) << "init RowsetBuilder.";
+        RowsetWriterContext context;
+        context.rowset_id = rowset_id;
+        context.tablet_id = cur_tablet->tablet_id();
+        context.partition_id = _request.partition_id;
+        context.tablet_schema_hash = cur_tablet->schema_hash();
+        context.rowset_type = ALPHA_ROWSET;
+        context.rowset_path_prefix = cur_tablet->tablet_path();
+        context.tablet_schema = &(cur_tablet->tablet_schema());
+        context.rowset_state = PREPARED;
+        context.data_dir = cur_tablet->data_dir();
+        context.txn_id = _request.transaction_id;
+        context.load_id = load_id;
+        rowset_writer->init(context);
+
+        // 3. New RowsetBuilder to write data into rowset
+        VLOG(3) << "init rowset builder. tablet=" << cur_tablet->full_name()
+            << ", block_row_size=" << cur_tablet->num_rows_per_row_block();
+
+        // 4. Init RowCursor
+        if (OLAP_SUCCESS != (res = row.init(cur_tablet->tablet_schema()))) {
+            LOG(WARNING) << "fail to init rowcursor. res=" << res;
             break;
-          }
-          num_rows++;
         }
-      }
-
-      reader->finalize();
 
-      if (!reader->validate_checksum()) {
-        LOG(WARNING) << "pushed delta file has wrong checksum.";
-        res = OLAP_ERR_PUSH_BUILD_DELTA_ERROR;
-        break;
-      }
-    }
-
-    if (rowset_writer->flush() != OLAP_SUCCESS) {
-      LOG(WARNING) << "failed to finalize writer.";
-      break;
-    }
-    *cur_rowset = rowset_writer->build();
+        // 5. Read data from raw file and write into SegmentGroup of cur_tablet
+        if (_request.__isset.http_file_path) {
+            // Convert from raw to delta
+            VLOG(3) << "start to convert row file to delta.";
+            while (!reader->eof()) {
+                res = reader->next(&row, rowset_writer->mem_pool());
+                if (OLAP_SUCCESS != res) {
+                    LOG(WARNING) << "read next row failed."
+                        << " res=" << res << " read_rows=" << num_rows;
+                    break;
+                } else {
+                    if (OLAP_SUCCESS != (res = rowset_writer->add_row(&row))) {
+                        LOG(WARNING) << "fail to attach row to rowset_writer. "
+                            << " res=" << res
+                            << ", tablet=" << cur_tablet->full_name()
+                            << " read_rows=" << num_rows;
+                        break;
+                    }
+                    num_rows++;
+                }
+            }
+
+            reader->finalize();
+
+            if (!reader->validate_checksum()) {
+                LOG(WARNING) << "pushed delta file has wrong checksum.";
+                res = OLAP_ERR_PUSH_BUILD_DELTA_ERROR;
+                break;
+            }
+        }
 
-    if (*cur_rowset == nullptr) {
-      LOG(WARNING) << "fail to build rowset";
-      res = OLAP_ERR_MALLOC_ERROR;
-      break;
-    }
+        if (rowset_writer->flush() != OLAP_SUCCESS) {
+            LOG(WARNING) << "failed to finalize writer.";
+            break;
+        }
+        *cur_rowset = rowset_writer->build();
 
-    _write_bytes += (*cur_rowset)->data_disk_size();
-    _write_rows += (*cur_rowset)->num_rows();
+        if (*cur_rowset == nullptr) {
+            LOG(WARNING) << "fail to build rowset";
+            res = OLAP_ERR_MALLOC_ERROR;
+            break;
+        }
 
-    // 7. Convert data for schema change tables
-    VLOG(10) << "load to related tables of schema_change if possible.";
-    if (new_tablet != nullptr) {
-      SchemaChangeHandler schema_change;
-      res = schema_change.schema_version_convert(curr_tablet, new_tablet,
-                                                 cur_rowset, new_rowset);
-      if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "failed to change schema version for delta."
-                     << "[res=" << res << " new_tablet='"
-                     << new_tablet->full_name() << "']";
-      }
-    }
-  } while (0);
+        _write_bytes += (*cur_rowset)->data_disk_size();
+        _write_rows += (*cur_rowset)->num_rows();
+
+        // 7. Convert data for schema change tables
+        VLOG(10) << "load to related tables of schema_change if possible.";
+        if (new_tablet != nullptr) {
+            SchemaChangeHandler schema_change;
+            res = schema_change.schema_version_convert(cur_tablet, new_tablet,
+                    cur_rowset, new_rowset);
+            if (res != OLAP_SUCCESS) {
+                LOG(WARNING) << "failed to change schema version for delta."
+                    << "[res=" << res << " new_tablet='"
+                    << new_tablet->full_name() << "']";
+            }
+        }
+    } while (0);
 
-  SAFE_DELETE(reader);
-  OLAP_LOG_NOTICE_PUSH("processed_rows", "%d", num_rows);
-  VLOG(10) << "convert delta file end. res=" << res
-           << ", tablet=" << curr_tablet->full_name();
-  return res;
+    SAFE_DELETE(reader);
+    VLOG(10) << "convert delta file end. res=" << res
+             << ", tablet=" << cur_tablet->full_name()
+             << ", processed_rows" << num_rows;
+    return res;
 }
 
 OLAPStatus BinaryFile::init(const char* path) {
-  // open file
-  if (OLAP_SUCCESS != open(path, "rb")) {
-    OLAP_LOG_WARNING("fail to open file. [file='%s']", path);
-    return OLAP_ERR_IO_ERROR;
-  }
+    // open file
+    if (OLAP_SUCCESS != open(path, "rb")) {
+        OLAP_LOG_WARNING("fail to open file. [file='%s']", path);
 
 Review comment:
   LOG(WARNING)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@doris.apache.org
For additional commands, e-mail: dev-help@doris.apache.org