You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/15 02:07:42 UTC
[incubator-seatunnel] branch dev updated: [Improve][Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c2cd0896e [Improve][Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722)
c2cd0896e is described below
commit c2cd0896e10127ac26852ab3d8f2ff86c628241b
Author: Jia Fan <fa...@qq.com>
AuthorDate: Mon May 15 10:07:35 2023 +0800
[Improve][Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722)
---
.../engine/core/checkpoint/CheckpointType.java | 9 +++++++++
.../engine/server/serializable/RecordSerializer.java | 2 +-
.../server/serializable/RecordSerializerHook.java | 2 --
.../server/task/flow/ShuffleSinkFlowLifeCycle.java | 3 ++-
.../META-INF/services/com.hazelcast.SerializerHook | 18 ++++++++++++++++++
5 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
index 09201c66b..ab012ed87 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
@@ -31,6 +31,15 @@ public enum CheckpointType {
private final boolean auto;
private final String name;
+ public static CheckpointType fromName(String name) {
+ for (CheckpointType type : CheckpointType.values()) {
+ if (type.name.equals(name)) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException("Unknown checkpoint type: " + name);
+ }
+
CheckpointType(boolean auto, String name) {
this.auto = auto;
this.name = name;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
index e27d68074..90caab3c1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializer.java
@@ -67,7 +67,7 @@ public class RecordSerializer implements StreamSerializer<Record> {
if (dataType == RecordDataType.CHECKPOINT_BARRIER.ordinal()) {
data =
new CheckpointBarrier(
- in.readLong(), in.readLong(), CheckpointType.valueOf(in.readString()));
+ in.readLong(), in.readLong(), CheckpointType.fromName(in.readString()));
} else if (dataType == RecordDataType.SEATUNNEL_ROW.ordinal()) {
String tableId = in.readString();
byte rowKind = in.readByte();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
index a9cccc7b9..6e2c594b3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/RecordSerializerHook.java
@@ -19,11 +19,9 @@ package org.apache.seatunnel.engine.server.serializable;
import org.apache.seatunnel.api.table.type.Record;
-import com.google.auto.service.AutoService;
import com.hazelcast.nio.serialization.Serializer;
import com.hazelcast.nio.serialization.SerializerHook;
-@AutoService(SerializerHook.class)
public class RecordSerializerHook implements SerializerHook<Record> {
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 8637ea7c3..9f02d7a94 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -168,7 +168,8 @@ public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle
for (Map.Entry<String, Queue<Record<?>>> shuffleBatch : shuffleBuffer.entrySet()) {
IQueue<Record<?>> shuffleQueue = shuffles.get(shuffleBatch.getKey());
Queue<Record<?>> shuffleQueueBatch = shuffleBatch.getValue();
- if (!shuffleQueue.addAll(shuffleBatch.getValue())) {
+ if (shuffleQueue.remainingCapacity() <= 0
+ || !shuffleQueue.addAll(shuffleBatch.getValue())) {
for (; ; ) {
Record<?> shuffleItem = shuffleQueueBatch.poll();
if (shuffleItem == null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.SerializerHook b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.SerializerHook
new file mode 100644
index 000000000..aa82a10fc
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/resources/META-INF/services/com.hazelcast.SerializerHook
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.engine.server.serializable.RecordSerializerHook