You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/28 14:20:40 UTC

[inlong] branch release-1.3.0 updated (bdaf0e903 -> 7a2f34588)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from bdaf0e903 [INLONG-5642][SDK] Change SDK(cpp) logger framework (#5715)
     new 303a21795 [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed(addendum) (#5721)
     new 7a2f34588 [INLONG-5718][Manager][DataProxy] Fix log4j2 configuration does not take effect (#5719)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 inlong-dataproxy/conf/log4j2.xml                   |  4 +-
 .../manager/dao/entity/StreamSourceEntity.java     | 34 +++++++++++
 .../manager-web/src/main/resources/log4j2.xml      |  4 +-
 .../tubemq-client-cpp/src/baseconsumer.cc          | 68 +++++++++++++---------
 4 files changed, 78 insertions(+), 32 deletions(-)


[inlong] 02/02: [INLONG-5718][Manager][DataProxy] Fix log4j2 configuration does not take effect (#5719)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 7a2f34588b998a876fd6913fab4556fc3ce250a0
Author: emhui <11...@users.noreply.github.com>
AuthorDate: Sun Aug 28 18:46:19 2022 +0800

    [INLONG-5718][Manager][DataProxy] Fix log4j2 configuration does not take effect (#5719)
---
 inlong-dataproxy/conf/log4j2.xml                   |  4 +--
 .../manager/dao/entity/StreamSourceEntity.java     | 34 ++++++++++++++++++++++
 .../manager-web/src/main/resources/log4j2.xml      |  4 +--
 3 files changed, 38 insertions(+), 4 deletions(-)

diff --git a/inlong-dataproxy/conf/log4j2.xml b/inlong-dataproxy/conf/log4j2.xml
index 71ccb85e5..e51602b65 100644
--- a/inlong-dataproxy/conf/log4j2.xml
+++ b/inlong-dataproxy/conf/log4j2.xml
@@ -17,10 +17,10 @@
     specific language governing permissions and limitations
     under the License.
 -->
-<configuration status="WARN" monitorInterval="30" package="org.apache.inlong.dataproxy.config">
+<configuration status="WARN" monitorInterval="30" packages="org.apache.inlong.dataproxy.config">
     <Properties>
         <property name="basePath">${sys:dataproxy.log.path}</property>
-        <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p ${PID:-} [%15.15t] %-30.30C{1.}:%L %mask%n</property>
+        <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p ${PID:-} [%15.15t] %-30.30C{1.}:%L %mask %n</property>
         <property name="every_file_size">1G</property>
         <property name="output_log_level">DEBUG</property>
         <property name="rolling_max">50</property>
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index 2d7a1aa9d..dab6db269 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -21,6 +21,8 @@ import lombok.Data;
 
 import java.io.Serializable;
 import java.util.Date;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.util.MaskDataUtils;
 
 /**
  * Stream source entity, including source type, source name, etc.
@@ -56,4 +58,36 @@ public class StreamSourceEntity implements Serializable {
     private Date createTime;
     private Date modifyTime;
 
+    @Override
+    public String toString() {
+        if (StringUtils.isNotEmpty(extParams)) {
+            StringBuilder buffer = new StringBuilder(extParams);
+            MaskDataUtils.mask(buffer);
+            extParams = buffer.toString();
+        }
+        return "StreamSourceEntity{"
+                + "id=" + id
+                + ", inlongGroupId='" + inlongGroupId + '\''
+                + ", inlongStreamId='" + inlongStreamId + '\''
+                + ", sourceType='" + sourceType + '\''
+                + ", sourceName='" + sourceName + '\''
+                + ", templateId=" + templateId
+                + ", agentIp='" + agentIp + '\''
+                + ", uuid='" + uuid + '\''
+                + ", dataNodeName='" + dataNodeName + '\''
+                + ", inlongClusterName='" + inlongClusterName + '\''
+                + ", serializationType='" + serializationType + '\''
+                + ", snapshot='" + snapshot + '\''
+                + ", reportTime=" + reportTime
+                + ", extParams='" + extParams + '\''
+                + ", version=" + version
+                + ", status=" + status
+                + ", previousStatus=" + previousStatus
+                + ", isDeleted=" + isDeleted
+                + ", creator='" + creator + '\''
+                + ", modifier='" + modifier + '\''
+                + ", createTime=" + createTime + '\''
+                + ", modifyTime=" + modifyTime
+                + '}';
+    }
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-web/src/main/resources/log4j2.xml b/inlong-manager/manager-web/src/main/resources/log4j2.xml
index 7add2849e..674f92e76 100644
--- a/inlong-manager/manager-web/src/main/resources/log4j2.xml
+++ b/inlong-manager/manager-web/src/main/resources/log4j2.xml
@@ -17,10 +17,10 @@
     specific language governing permissions and limitations
     under the License.
 -->
-<configuration status="WARN" monitorInterval="30" package="org.apache.inlong.manager.web.config">
+<configuration status="WARN" monitorInterval="30" packages="org.apache.inlong.manager.web.config">
     <properties>
         <property name="basePath">logs</property>
-        <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p [%5.30t] %-30.30C{1.}:%L - %mask%n</property>
+        <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p [%5.30t] %-30.30C{1.}:%L - %mask %n</property>
         <property name="every_file_size">1G</property>
         <property name="rolling_max">50</property>
         <property name="all_fileName">${basePath}/manager-all.log</property>


[inlong] 01/02: [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed(addendum) (#5721)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 303a2179543c4570794621097a338d0f1e4271db
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Aug 27 20:56:47 2022 +0800

    [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed(addendum) (#5721)
---
 .../tubemq-client-cpp/src/baseconsumer.cc          | 68 +++++++++++++---------
 1 file changed, 40 insertions(+), 28 deletions(-)

diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index c7152a16e..5fbff8ddc 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -127,27 +127,23 @@ void BaseConsumer::ShutDown() {
   // 3. close all brokers
   closeAllBrokers();
   // 4. check master hb thread status
-  int check_count = 5;
-  while (master_hb_status_.Get() != 0) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(30));
-    if (--check_count <= 0) {
-      LOG_INFO("[CONSUMER] Found hb status id not zero[%d], client=%s",
-               master_hb_status_.Get(), client_uuid_.c_str());  
-      break;
-    }
-  }
-  check_count = 5;
-  while (master_reg_status_.Get() != 0) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(30));
-    if (--check_count <= 0) {
-      LOG_INFO("[CONSUMER] Found reg status id not zero[%d], client=%s",
-               master_reg_status_.Get(), client_uuid_.c_str());  
-      break;
+  int check_count = 0;
+  while (master_hb_status_.Get() != 0 || master_reg_status_.Get() != 0) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(40));
+    if (++check_count % 10 == 0) {
+      if (check_count >= 1000) {
+          LOG_INFO("[CONSUMER] Found hb[%d] or reg[%d] not zero, count=%d, exit, client=%s",
+                   master_hb_status_.Get(), master_reg_status_.Get(), check_count, client_uuid_.c_str());
+        break;
+      } else {
+        LOG_INFO("[CONSUMER] Found hb[%d] or reg[%d] not zero, count=%d, continue, client=%s",
+                 master_hb_status_.Get(), master_reg_status_.Get(), check_count, client_uuid_.c_str());
+      }
     }
   }
   // 5. join hb thread;
-  heart_beat_timer_ = nullptr;
   rebalance_thread_ptr_->join();
+  heart_beat_timer_ = nullptr;
   rebalance_thread_ptr_ = nullptr;
   // 6. remove client stub
   TubeMQService::Instance()->RmvClientObj(shared_from_this());
@@ -539,21 +535,26 @@ void BaseConsumer::heartBeat2Master() {
   req_protocol->request_id_ = request->request_id_;
   req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
   // send message to target
-  auto self = shared_from_this();
   AsyncRequest(request, req_protocol)
-      .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
+      .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+        if (GetClientIndex() == tb_config::kInvalidValue ||
+          !TubeMQService::Instance()->IsRunning() ||
+          !isClientRunning()) {
+          master_hb_status_.CompareAndSet(1, 0);
+          return;
+        }
         if (error.Value() != err_code::kErrSuccess) {
           master_sh_retry_cnt_++;
           LOG_WARN("[CONSUMER] heartBeat2Master failue to (%s:%d) : %s, client=%s",
                    target_ip.c_str(), target_port, error.Message().c_str(), client_uuid_.c_str());
           if (master_sh_retry_cnt_ >= tb_config::kMaxMasterHBRetryCount) {
-              LOG_WARN("[CONSUMER] heartBeat2Master found over max-hb-retry(%d), client=%s",
-                       master_sh_retry_cnt_, client_uuid_.c_str());
-              master_sh_retry_cnt_ = 0;
-              is_master_actived_.Set(false);
-              asyncRegister2Master(true);
-              master_hb_status_.CompareAndSet(1, 0);
-              return;
+            LOG_WARN("[CONSUMER] heartBeat2Master found over max-hb-retry(%d), client=%s",
+                     master_sh_retry_cnt_, client_uuid_.c_str());
+            master_sh_retry_cnt_ = 0;
+            is_master_actived_.Set(false);
+            asyncRegister2Master(true);
+            master_hb_status_.CompareAndSet(1, 0);
+            return;
           }
         } else {
           // process response
@@ -578,7 +579,14 @@ void BaseConsumer::heartBeat2Master() {
             }
           }
         }
+        if (GetClientIndex() == tb_config::kInvalidValue ||
+          !TubeMQService::Instance()->IsRunning() ||
+          !isClientRunning()) {
+          master_hb_status_.CompareAndSet(1, 0);
+          return;
+        }
         heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms()));
+        auto self = shared_from_this();
         heart_beat_timer_->async_wait([self, this](const std::error_code& ec) {
           if (ec) {
             return;
@@ -786,9 +794,13 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
   req_protocol->request_id_ = request->request_id_;
   req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
   // send message to target
-  auto self = shared_from_this();
   AsyncRequest(request, req_protocol)
-      .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
+      .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+        if (GetClientIndex() == tb_config::kInvalidValue ||
+          !TubeMQService::Instance()->IsRunning() ||
+          !isClientRunning()) {
+          return;
+        }
         if (error.Value() != err_code::kErrSuccess) {
           LOG_WARN("[Heartbeat2Broker] request network  to failure (%s), ression is %s",
                    broker_info.GetAddrInfo().c_str(), error.Message().c_str());