You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/15 02:07:42 UTC

[incubator-seatunnel] branch dev updated: [Improve][Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c2cd0896e [Improve][Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722)
c2cd0896e is described below

commit c2cd0896e10127ac26852ab3d8f2ff86c628241b
Author: Jia Fan <fa...@qq.com>
AuthorDate: Mon May 15 10:07:35 2023 +0800

    [Improve][Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722)
---
 .../engine/core/checkpoint/CheckpointType.java         |  9 +++++++++
 .../engine/server/serializable/RecordSerializer.java   |  2 +-
 .../server/serializable/RecordSerializerHook.java      |  2 --
 .../server/task/flow/ShuffleSinkFlowLifeCycle.java     |  3 ++-
 .../META-INF/services/com.hazelcast.SerializerHook     | 18 ++++++++++++++++++
 5 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
index 09201c66b..ab012ed87 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
@@ -31,6 +31,15 @@ public enum CheckpointType {
     private final boolean auto;
     private final String name;
 
+    public static CheckpointType fromName(String name) {
+        for (CheckpointType type : CheckpointType.values()) {
+            if (type.name.equals(name)) {
+                return type;
+            }
+        }
+        throw new IllegalArgumentException("Unknown checkpoint type: " + name);
+    }
+
     CheckpointType(boolean auto, String name) {
         this.auto = auto;
         this.name = name;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
index e27d68074..90caab3c1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
@@ -67,7 +67,7 @@ public class RecordSerializer implements StreamSerializer<Record> {
         if (dataType == RecordDataType.CHECKPOINT_BARRIER.ordinal()) {
             data =
                     new CheckpointBarrier(
-                            in.readLong(), in.readLong(), CheckpointType.valueOf(in.readString()));
+                            in.readLong(), in.readLong(), CheckpointType.fromName(in.readString()));
         } else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) {
             String tableId = in.readString();
             byte rowKind = in.readByte();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
index a9cccc7b9..6e2c594b3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
@@ -19,11 +19,9 @@ package org.apache.seatunnel.engine.server.serializable;
 
 import org.apache.seatunnel.api.table.type.Record;
 
-import com.google.auto.service.AutoService;
 import com.hazelcast.nio.serialization.Serializer;
 import com.hazelcast.nio.serialization.SerializerHook;
 
-@AutoService(SerializerHook.class)
 public class RecordSerializerHook implements SerializerHook<Record> {
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 8637ea7c3..9f02d7a94 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -168,7 +168,8 @@ public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle
         for (Map.Entry<String, Queue<Record<?>>> shuffleBatch : shuffleBuffer.entrySet()) {
             IQueue<Record<?>> shuffleQueue = shuffles.get(shuffleBatch.getKey());
             Queue<Record<?>> shuffleQueueBatch = shuffleBatch.getValue();
-            if (!shuffleQueue.addAll(shuffleBatch.getValue())) {
+            if (shuffleQueue.remainingCapacity() <= 0
+                    || !shuffleQueue.addAll(shuffleBatch.getValue())) {
                 for (; ; ) {
                     Record<?> shuffleItem = shuffleQueueBatch.poll();
                     if (shuffleItem == null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.SerializerHook b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.SerializerHook
new file mode 100644
index 000000000..aa82a10fc
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.SerializerHook
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.engine.server.serializable.RecordSerializerHook