You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/09/14 07:56:57 UTC

[incubator-doris] branch master updated: support change column type from decimal to string (#6643)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 61c9d11  support change column type from decimal to string (#6643)
61c9d11 is described below

commit 61c9d11fdb72dccf94876ae3706e7ed492622807
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Tue Sep 14 15:56:44 2021 +0800

    support change column type from decimal to string (#6643)
---
 be/src/olap/rowset/segment_v2/encoding_info.cpp    |  1 +
 be/src/runtime/data_stream_recvr.cc                |  5 +-
 be/src/runtime/data_stream_sender.cpp              | 65 +++++++++++-----------
 be/src/runtime/row_batch.cpp                       | 12 ++--
 be/src/runtime/row_batch.h                         |  8 +--
 .../main/java/org/apache/doris/catalog/Column.java |  3 +-
 6 files changed, 47 insertions(+), 47 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/encoding_info.cpp b/be/src/olap/rowset/segment_v2/encoding_info.cpp
index 801cdb4..e4ce43c 100644
--- a/be/src/olap/rowset/segment_v2/encoding_info.cpp
+++ b/be/src/olap/rowset/segment_v2/encoding_info.cpp
@@ -255,6 +255,7 @@ EncodingInfoResolver::EncodingInfoResolver() {
     _add_map<OLAP_FIELD_TYPE_VARCHAR, PLAIN_ENCODING>();
     _add_map<OLAP_FIELD_TYPE_VARCHAR, PREFIX_ENCODING, true>();
 
+    _add_map<OLAP_FIELD_TYPE_STRING, DICT_ENCODING>();
     _add_map<OLAP_FIELD_TYPE_STRING, PLAIN_ENCODING>();
     _add_map<OLAP_FIELD_TYPE_STRING, PREFIX_ENCODING, true>();
 
diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc
index b836253..49b2495 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -213,7 +213,7 @@ void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_n
         _packet_seq_map.emplace(be_number, packet_seq);
     }
 
-    int batch_size = RowBatch::get_batch_size(pb_batch);
+    size_t batch_size = RowBatch::get_batch_size(pb_batch);
     COUNTER_UPDATE(_recvr->_bytes_received_counter, batch_size);
 
     // Following situation will match the following condition.
@@ -446,8 +446,7 @@ DataStreamRecvr::DataStreamRecvr(
           _num_buffered_bytes(0),
           _profile(profile),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
-    _mem_tracker = MemTracker::CreateTracker(
-            _profile, -1, "DataStreamRecvr", parent_tracker);
+    _mem_tracker = MemTracker::CreateTracker(_profile, -1, "DataStreamRecvr", parent_tracker);
 
     // Create one queue per sender if is_merging is true.
     int num_queues = is_merging ? num_senders : 1;
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index fa7c979..2884374 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -52,35 +52,35 @@
 namespace doris {
 
 DataStreamSender::Channel::Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
-                               const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id,
-                               PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
-                               bool send_query_statistics_with_every_batch)
-       : _parent(parent),
-         _buffer_size(buffer_size),
-         _row_desc(row_desc),
-         _fragment_instance_id(fragment_instance_id),
-         _dest_node_id(dest_node_id),
-         _num_data_bytes_sent(0),
-         _packet_seq(0),
-         _need_close(false),
-         _be_number(0),
-         _brpc_dest_addr(brpc_dest),
-         _is_transfer_chain(is_transfer_chain),
-         _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
+                                   const TNetworkAddress& brpc_dest,
+                                   const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
+                                   int buffer_size, bool is_transfer_chain,
+                                   bool send_query_statistics_with_every_batch)
+        : _parent(parent),
+          _buffer_size(buffer_size),
+          _row_desc(row_desc),
+          _fragment_instance_id(fragment_instance_id),
+          _dest_node_id(dest_node_id),
+          _num_data_bytes_sent(0),
+          _packet_seq(0),
+          _need_close(false),
+          _be_number(0),
+          _brpc_dest_addr(brpc_dest),
+          _is_transfer_chain(is_transfer_chain),
+          _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
     std::string localhost = BackendOptions::get_localhost();
-    _is_local =
-           _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
+    _is_local = _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
     if (_is_local) {
-       LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
+        LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
     }
 }
 
 DataStreamSender::Channel::~Channel() {
-   if (_closure != nullptr && _closure->unref()) {
-       delete _closure;
-   }
-   // release this before request desctruct
-   _brpc_request.release_finst_id();
+    if (_closure != nullptr && _closure->unref()) {
+        delete _closure;
+    }
+    // release this before request desctruct
+    _brpc_request.release_finst_id();
 }
 
 Status DataStreamSender::Channel::init(RuntimeState* state) {
@@ -185,7 +185,7 @@ Status DataStreamSender::Channel::send_current_batch(bool eos) {
     }
     {
         SCOPED_TIMER(_parent->_serialize_batch_timer);
-        int uncompressed_bytes = _batch->serialize(&_pb_batch);
+        size_t uncompressed_bytes = _batch->serialize(&_pb_batch);
         COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch));
         COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes);
     }
@@ -259,13 +259,12 @@ Status DataStreamSender::Channel::close_wait(RuntimeState* state) {
 
 DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc)
         : _pool(pool),
-         _sender_id(sender_id),
-         _row_desc(row_desc),
-         _serialize_batch_timer(NULL),
-         _bytes_sent_counter(NULL),
-         _local_bytes_send_counter(NULL),
-         _current_pb_batch(&_pb_batch1) {
-}
+          _sender_id(sender_id),
+          _row_desc(row_desc),
+          _serialize_batch_timer(NULL),
+          _bytes_sent_counter(NULL),
+          _local_bytes_send_counter(NULL),
+          _current_pb_batch(&_pb_batch1) {}
 
 DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
                                    const TDataStreamSink& sink,
@@ -648,8 +647,8 @@ Status DataStreamSender::serialize_batch(RowBatch* src, T* dest, int num_receive
         SCOPED_TIMER(_serialize_batch_timer);
         // TODO(zc)
         // RETURN_IF_ERROR(src->serialize(dest));
-        int uncompressed_bytes = src->serialize(dest);
-        int bytes = RowBatch::get_batch_size(*dest);
+        size_t uncompressed_bytes = src->serialize(dest);
+        size_t bytes = RowBatch::get_batch_size(*dest);
         // TODO(zc)
         // int uncompressed_bytes = bytes - dest->tuple_data.size() + dest->uncompressed_size;
         // The size output_batch would be if we didn't compress tuple_data (will be equal to
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 0c76a67..25855f6 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -370,7 +370,7 @@ RowBatch::~RowBatch() {
     clear();
 }
 
-int RowBatch::serialize(TRowBatch* output_batch) {
+size_t RowBatch::serialize(TRowBatch* output_batch) {
     // why does Thrift not generate a Clear() function?
     output_batch->row_tuples.clear();
     output_batch->tuple_offsets.clear();
@@ -437,7 +437,7 @@ int RowBatch::serialize(TRowBatch* output_batch) {
     return get_batch_size(*output_batch) - output_batch->tuple_data.size() + size;
 }
 
-int RowBatch::serialize(PRowBatch* output_batch) {
+size_t RowBatch::serialize(PRowBatch* output_batch) {
     // num_rows
     output_batch->set_num_rows(_num_rows);
     // row_tuples
@@ -625,15 +625,15 @@ void RowBatch::transfer_resource_ownership(RowBatch* dest) {
     reset();
 }
 
-int RowBatch::get_batch_size(const TRowBatch& batch) {
-    int result = batch.tuple_data.size();
+size_t RowBatch::get_batch_size(const TRowBatch& batch) {
+    size_t result = batch.tuple_data.size();
     result += batch.row_tuples.size() * sizeof(TTupleId);
     result += batch.tuple_offsets.size() * sizeof(int32_t);
     return result;
 }
 
-int RowBatch::get_batch_size(const PRowBatch& batch) {
-    int result = batch.tuple_data().size();
+size_t RowBatch::get_batch_size(const PRowBatch& batch) {
+    size_t result = batch.tuple_data().size();
     result += batch.row_tuples().size() * sizeof(int32_t);
     result += batch.tuple_offsets().size() * sizeof(int32_t);
     return result;
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index a25c1a7..b0733e6 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -350,12 +350,12 @@ public:
     // This function does not reset().
     // Returns the uncompressed serialized size (this will be the true size of output_batch
     // if tuple_data is actually uncompressed).
-    int serialize(TRowBatch* output_batch);
-    int serialize(PRowBatch* output_batch);
+    size_t serialize(TRowBatch* output_batch);
+    size_t serialize(PRowBatch* output_batch);
 
     // Utility function: returns total size of batch.
-    static int get_batch_size(const TRowBatch& batch);
-    static int get_batch_size(const PRowBatch& batch);
+    static size_t get_batch_size(const TRowBatch& batch);
+    static size_t get_batch_size(const PRowBatch& batch);
 
     int num_rows() const { return _num_rows; }
     int capacity() const { return _capacity; }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index c719409..c182377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -409,7 +409,8 @@ public class Column implements Writable {
         }
 
         // now we support convert decimal to varchar type
-        if (getDataType() == PrimitiveType.DECIMALV2 && other.getDataType() == PrimitiveType.VARCHAR) {
+        if (getDataType() == PrimitiveType.DECIMALV2 && (other.getDataType() == PrimitiveType.VARCHAR
+                || other.getDataType() == PrimitiveType.STRING)) {
             return;
         }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org