You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/09/14 07:56:57 UTC
[incubator-doris] branch master updated: support change column type
from decimal to string (#6643)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 61c9d11 support change column type from decimal to string (#6643)
61c9d11 is described below
commit 61c9d11fdb72dccf94876ae3706e7ed492622807
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Tue Sep 14 15:56:44 2021 +0800
support change column type from decimal to string (#6643)
---
be/src/olap/rowset/segment_v2/encoding_info.cpp | 1 +
be/src/runtime/data_stream_recvr.cc | 5 +-
be/src/runtime/data_stream_sender.cpp | 65 +++++++++++-----------
be/src/runtime/row_batch.cpp | 12 ++--
be/src/runtime/row_batch.h | 8 +--
.../main/java/org/apache/doris/catalog/Column.java | 3 +-
6 files changed, 47 insertions(+), 47 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/encoding_info.cpp b/be/src/olap/rowset/segment_v2/encoding_info.cpp
index 801cdb4..e4ce43c 100644
--- a/be/src/olap/rowset/segment_v2/encoding_info.cpp
+++ b/be/src/olap/rowset/segment_v2/encoding_info.cpp
@@ -255,6 +255,7 @@ EncodingInfoResolver::EncodingInfoResolver() {
_add_map<OLAP_FIELD_TYPE_VARCHAR, PLAIN_ENCODING>();
_add_map<OLAP_FIELD_TYPE_VARCHAR, PREFIX_ENCODING, true>();
+ _add_map<OLAP_FIELD_TYPE_STRING, DICT_ENCODING>();
_add_map<OLAP_FIELD_TYPE_STRING, PLAIN_ENCODING>();
_add_map<OLAP_FIELD_TYPE_STRING, PREFIX_ENCODING, true>();
diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc
index b836253..49b2495 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -213,7 +213,7 @@ void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_n
_packet_seq_map.emplace(be_number, packet_seq);
}
- int batch_size = RowBatch::get_batch_size(pb_batch);
+ size_t batch_size = RowBatch::get_batch_size(pb_batch);
COUNTER_UPDATE(_recvr->_bytes_received_counter, batch_size);
// Following situation will match the following condition.
@@ -446,8 +446,7 @@ DataStreamRecvr::DataStreamRecvr(
_num_buffered_bytes(0),
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
- _mem_tracker = MemTracker::CreateTracker(
- _profile, -1, "DataStreamRecvr", parent_tracker);
+ _mem_tracker = MemTracker::CreateTracker(_profile, -1, "DataStreamRecvr", parent_tracker);
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index fa7c979..2884374 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -52,35 +52,35 @@
namespace doris {
DataStreamSender::Channel::Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
- const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
- bool send_query_statistics_with_every_batch)
- : _parent(parent),
- _buffer_size(buffer_size),
- _row_desc(row_desc),
- _fragment_instance_id(fragment_instance_id),
- _dest_node_id(dest_node_id),
- _num_data_bytes_sent(0),
- _packet_seq(0),
- _need_close(false),
- _be_number(0),
- _brpc_dest_addr(brpc_dest),
- _is_transfer_chain(is_transfer_chain),
- _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
+ const TNetworkAddress& brpc_dest,
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
+ int buffer_size, bool is_transfer_chain,
+ bool send_query_statistics_with_every_batch)
+ : _parent(parent),
+ _buffer_size(buffer_size),
+ _row_desc(row_desc),
+ _fragment_instance_id(fragment_instance_id),
+ _dest_node_id(dest_node_id),
+ _num_data_bytes_sent(0),
+ _packet_seq(0),
+ _need_close(false),
+ _be_number(0),
+ _brpc_dest_addr(brpc_dest),
+ _is_transfer_chain(is_transfer_chain),
+ _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
std::string localhost = BackendOptions::get_localhost();
- _is_local =
- _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
+ _is_local = _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
if (_is_local) {
- LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
+ LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
}
}
DataStreamSender::Channel::~Channel() {
- if (_closure != nullptr && _closure->unref()) {
- delete _closure;
- }
- // release this before request desctruct
- _brpc_request.release_finst_id();
+ if (_closure != nullptr && _closure->unref()) {
+ delete _closure;
+ }
+ // release this before request desctruct
+ _brpc_request.release_finst_id();
}
Status DataStreamSender::Channel::init(RuntimeState* state) {
@@ -185,7 +185,7 @@ Status DataStreamSender::Channel::send_current_batch(bool eos) {
}
{
SCOPED_TIMER(_parent->_serialize_batch_timer);
- int uncompressed_bytes = _batch->serialize(&_pb_batch);
+ size_t uncompressed_bytes = _batch->serialize(&_pb_batch);
COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch));
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes);
}
@@ -259,13 +259,12 @@ Status DataStreamSender::Channel::close_wait(RuntimeState* state) {
DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc)
: _pool(pool),
- _sender_id(sender_id),
- _row_desc(row_desc),
- _serialize_batch_timer(NULL),
- _bytes_sent_counter(NULL),
- _local_bytes_send_counter(NULL),
- _current_pb_batch(&_pb_batch1) {
-}
+ _sender_id(sender_id),
+ _row_desc(row_desc),
+ _serialize_batch_timer(NULL),
+ _bytes_sent_counter(NULL),
+ _local_bytes_send_counter(NULL),
+ _current_pb_batch(&_pb_batch1) {}
DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
const TDataStreamSink& sink,
@@ -648,8 +647,8 @@ Status DataStreamSender::serialize_batch(RowBatch* src, T* dest, int num_receive
SCOPED_TIMER(_serialize_batch_timer);
// TODO(zc)
// RETURN_IF_ERROR(src->serialize(dest));
- int uncompressed_bytes = src->serialize(dest);
- int bytes = RowBatch::get_batch_size(*dest);
+ size_t uncompressed_bytes = src->serialize(dest);
+ size_t bytes = RowBatch::get_batch_size(*dest);
// TODO(zc)
// int uncompressed_bytes = bytes - dest->tuple_data.size() + dest->uncompressed_size;
// The size output_batch would be if we didn't compress tuple_data (will be equal to
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 0c76a67..25855f6 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -370,7 +370,7 @@ RowBatch::~RowBatch() {
clear();
}
-int RowBatch::serialize(TRowBatch* output_batch) {
+size_t RowBatch::serialize(TRowBatch* output_batch) {
// why does Thrift not generate a Clear() function?
output_batch->row_tuples.clear();
output_batch->tuple_offsets.clear();
@@ -437,7 +437,7 @@ int RowBatch::serialize(TRowBatch* output_batch) {
return get_batch_size(*output_batch) - output_batch->tuple_data.size() + size;
}
-int RowBatch::serialize(PRowBatch* output_batch) {
+size_t RowBatch::serialize(PRowBatch* output_batch) {
// num_rows
output_batch->set_num_rows(_num_rows);
// row_tuples
@@ -625,15 +625,15 @@ void RowBatch::transfer_resource_ownership(RowBatch* dest) {
reset();
}
-int RowBatch::get_batch_size(const TRowBatch& batch) {
- int result = batch.tuple_data.size();
+size_t RowBatch::get_batch_size(const TRowBatch& batch) {
+ size_t result = batch.tuple_data.size();
result += batch.row_tuples.size() * sizeof(TTupleId);
result += batch.tuple_offsets.size() * sizeof(int32_t);
return result;
}
-int RowBatch::get_batch_size(const PRowBatch& batch) {
- int result = batch.tuple_data().size();
+size_t RowBatch::get_batch_size(const PRowBatch& batch) {
+ size_t result = batch.tuple_data().size();
result += batch.row_tuples().size() * sizeof(int32_t);
result += batch.tuple_offsets().size() * sizeof(int32_t);
return result;
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index a25c1a7..b0733e6 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -350,12 +350,12 @@ public:
// This function does not reset().
// Returns the uncompressed serialized size (this will be the true size of output_batch
// if tuple_data is actually uncompressed).
- int serialize(TRowBatch* output_batch);
- int serialize(PRowBatch* output_batch);
+ size_t serialize(TRowBatch* output_batch);
+ size_t serialize(PRowBatch* output_batch);
// Utility function: returns total size of batch.
- static int get_batch_size(const TRowBatch& batch);
- static int get_batch_size(const PRowBatch& batch);
+ static size_t get_batch_size(const TRowBatch& batch);
+ static size_t get_batch_size(const PRowBatch& batch);
int num_rows() const { return _num_rows; }
int capacity() const { return _capacity; }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index c719409..c182377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -409,7 +409,8 @@ public class Column implements Writable {
}
// now we support convert decimal to varchar type
- if (getDataType() == PrimitiveType.DECIMALV2 && other.getDataType() == PrimitiveType.VARCHAR) {
+ if (getDataType() == PrimitiveType.DECIMALV2 && (other.getDataType() == PrimitiveType.VARCHAR
+ || other.getDataType() == PrimitiveType.STRING)) {
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org