You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/25 08:51:19 UTC
[flink] 02/02: [hotfix][table-planner] Clean up serde classes
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3d8748316f597e6936442dc514f266ffbcce4bed
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Feb 24 14:21:25 2022 +0100
[hotfix][table-planner] Clean up serde classes
---
.../plan/abilities/sink/WritingMetadataSpec.java | 6 ---
.../abilities/source/SourceAbilitySpecBase.java | 6 ---
.../planner/plan/logical/WindowingStrategy.java | 6 ---
.../table/planner/plan/nodes/exec/ExecNode.java | 6 ---
.../planner/plan/nodes/exec/InputProperty.java | 6 ---
.../plan/nodes/exec/common/CommonExecValues.java | 10 +---
.../exec/serde/AggregateCallJsonDeserializer.java | 12 +++--
.../exec/serde/AggregateCallJsonSerializer.java | 18 +++----
.../exec/serde/ChangelogModeJsonDeserializer.java | 14 ++---
.../exec/serde/ChangelogModeJsonSerializer.java | 11 ++--
.../nodes/exec/serde/ColumnJsonDeserializer.java | 11 +++-
.../nodes/exec/serde/ColumnJsonSerializer.java | 31 ++++++-----
.../ContextResolvedTableJsonDeserializer.java | 11 +++-
.../serde/ContextResolvedTableJsonSerializer.java | 16 ++++--
.../nodes/exec/serde/DataTypeJsonDeserializer.java | 4 +-
.../nodes/exec/serde/DataTypeJsonSerializer.java | 20 ++++----
.../exec/serde/ExecNodeGraphJsonDeserializer.java | 13 +++--
.../exec/serde/ExecNodeGraphJsonSerializer.java | 11 ++--
.../exec/serde/FlinkVersionJsonDeserializer.java | 12 +++--
.../exec/serde/FlinkVersionJsonSerializer.java | 12 +++--
.../plan/nodes/exec/serde/JsonPlanEdge.java | 24 +++++----
.../plan/nodes/exec/serde/JsonPlanGraph.java | 16 +++---
.../plan/nodes/exec/serde/JsonSerdeUtil.java | 24 ++++-----
.../exec/serde/LogicalTypeJsonDeserializer.java | 4 +-
.../exec/serde/LogicalTypeJsonSerializer.java | 60 +++++++++++-----------
.../exec/serde/LogicalWindowJsonDeserializer.java | 11 ++--
.../exec/serde/LogicalWindowJsonSerializer.java | 39 +++++++-------
.../serde/ObjectIdentifierJsonDeserializer.java | 12 +++--
.../exec/serde/ObjectIdentifierJsonSerializer.java | 12 +++--
.../exec/serde/RelDataTypeJsonDeserializer.java | 4 +-
.../exec/serde/RelDataTypeJsonSerializer.java | 4 +-
.../RequiredDistributionJsonDeserializer.java | 15 ++++--
.../serde/RequiredDistributionJsonSerializer.java | 12 +++--
.../ResolvedCatalogTableJsonDeserializer.java | 11 +++-
.../serde/ResolvedCatalogTableJsonSerializer.java | 19 ++++---
.../serde/ResolvedExpressionJsonDeserializer.java | 11 +++-
.../serde/ResolvedExpressionJsonSerializer.java | 19 ++++---
.../exec/serde/ResolvedSchemaJsonDeserializer.java | 11 +++-
.../exec/serde/ResolvedSchemaJsonSerializer.java | 17 ++++--
.../nodes/exec/serde/RexNodeJsonDeserializer.java | 4 +-
.../nodes/exec/serde/RexNodeJsonSerializer.java | 60 +++++++++++-----------
.../exec/serde/RexWindowBoundJsonDeserializer.java | 11 ++--
.../exec/serde/RexWindowBoundJsonSerializer.java | 28 +++++-----
.../plan/nodes/exec/serde/SerdeContext.java | 2 +
.../nodes/exec/serde/ShuffleJsonDeserializer.java | 15 ++++--
.../nodes/exec/serde/ShuffleJsonSerializer.java | 12 +++--
.../nodes/exec/serde/UniqueConstraintMixin.java | 10 ++--
.../plan/nodes/exec/serde/WatermarkSpecMixin.java | 6 ++-
.../serde/WindowReferenceJsonDeserializer.java | 4 +-
.../exec/serde/WindowReferenceJsonSerializer.java | 8 +--
.../planner/plan/nodes/exec/spec/OverSpec.java | 9 ----
.../stream/StreamExecGlobalGroupAggregate.java | 6 ---
.../stream/StreamExecGlobalWindowAggregate.java | 6 ---
.../stream/StreamExecGroupWindowAggregate.java | 6 ---
.../StreamExecIncrementalGroupAggregate.java | 6 ---
.../plan/nodes/exec/stream/StreamExecSink.java | 6 ---
.../table/planner/plan/utils/LookupJoinUtil.java | 6 ---
57 files changed, 416 insertions(+), 350 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
index 46a9845..dcdd948 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
@@ -21,8 +21,6 @@ package org.apache.flink.table.planner.plan.abilities.sink;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.TypeConversions;
@@ -31,8 +29,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.List;
import java.util.Objects;
@@ -52,8 +48,6 @@ public final class WritingMetadataSpec implements SinkAbilitySpec {
private final List<String> metadataKeys;
@JsonProperty(FIELD_NAME_CONSUMED_TYPE)
- @JsonSerialize(using = LogicalTypeJsonSerializer.class)
- @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
private final LogicalType consumedType;
@JsonCreator
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
index 8764205..1b5cbf3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
@@ -18,15 +18,11 @@
package org.apache.flink.table.planner.plan.abilities.source;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.annotation.Nullable;
@@ -40,8 +36,6 @@ public abstract class SourceAbilitySpecBase implements SourceAbilitySpec {
public static final String FIELD_NAME_PRODUCED_TYPE = "producedType";
@JsonProperty(FIELD_NAME_PRODUCED_TYPE)
- @JsonSerialize(using = LogicalTypeJsonSerializer.class)
- @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
private final @Nullable RowType producedType;
public SourceAbilitySpecBase() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
index d825419..47bca8d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
@@ -18,8 +18,6 @@
package org.apache.flink.table.planner.plan.logical;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
@@ -27,8 +25,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgn
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
/** Logical representation of a windowing strategy. */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "strategy")
@@ -46,8 +42,6 @@ public abstract class WindowingStrategy {
protected final WindowSpec window;
@JsonProperty(value = FIELD_NAME_TIME_ATTRIBUTE_TYPE)
- @JsonSerialize(using = LogicalTypeJsonSerializer.class)
- @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
protected final LogicalType timeAttributeType;
@JsonProperty(FIELD_NAME_IS_ROWTIME)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index 229305f..81d07b8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -20,8 +20,6 @@ package org.apache.flink.table.planner.plan.nodes.exec;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
import org.apache.flink.table.types.logical.LogicalType;
@@ -30,8 +28,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
import java.util.List;
@@ -75,8 +71,6 @@ public interface ExecNode<T> extends ExecNodeTranslator<T> {
* data structures.
*/
@JsonProperty(value = FIELD_NAME_OUTPUT_TYPE)
- @JsonSerialize(using = LogicalTypeJsonSerializer.class)
- @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
LogicalType getOutputType();
/**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
index d4ecbcf..a4c8205 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
@@ -20,14 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.exec;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.Arrays;
import java.util.Objects;
@@ -82,8 +78,6 @@ public class InputProperty {
* corresponding input.
*/
@JsonProperty(FIELD_NAME_REQUIRED_DISTRIBUTION)
- @JsonSerialize(using = RequiredDistributionJsonSerializer.class)
- @JsonDeserialize(using = RequiredDistributionJsonDeserializer.class)
private final RequiredDistribution requiredDistribution;
/** How does the input record trigger the output behavior of the target {@link ExecNode}. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
index 5a47186..21be27f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
@@ -27,14 +27,12 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer;
import org.apache.flink.table.runtime.operators.values.ValuesInputFormat;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
import java.util.Collections;
import java.util.List;
@@ -78,12 +76,8 @@ public abstract class CommonExecValues extends ExecNodeBase<RowData>
return transformation;
}
- /**
- * In order to use {@link RexNodeJsonSerializer} to serialize {@link RexLiteral}, so we force
- * cast element of tuples to {@link RexNode} which is the parent class of {@link RexLiteral}.
- */
@JsonProperty(value = FIELD_NAME_TUPLES)
- public List<List<RexNode>> getTuples() {
- return (List<List<RexNode>>) (Object) tuples;
+ public List<List<RexLiteral>> getTuples() {
+ return tuples;
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
index 28eef53..6697534 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -41,13 +43,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCall
import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_TYPE;
/**
- * JSON deserializer for {@link AggregateCall}. refer to {@link AggregateCallJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link AggregateCall}.
+ *
+ * @see AggregateCallJsonSerializer for the reverse operation
*/
-public class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall> {
+@Internal
+final class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall> {
private static final long serialVersionUID = 1L;
- public AggregateCallJsonDeserializer() {
+ AggregateCallJsonDeserializer() {
super(AggregateCall.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
index 35e3ac9..345400f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
@@ -37,18 +37,18 @@ import java.io.IOException;
* @see AggregateCallJsonDeserializer for the reverse operation
*/
@Internal
-public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
+final class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
private static final long serialVersionUID = 1L;
- public static final String FIELD_NAME_NAME = "name";
- public static final String FIELD_NAME_ARG_LIST = "argList";
- public static final String FIELD_NAME_FILTER_ARG = "filterArg";
- public static final String FIELD_NAME_DISTINCT = "distinct";
- public static final String FIELD_NAME_APPROXIMATE = "approximate";
- public static final String FIELD_NAME_IGNORE_NULLS = "ignoreNulls";
- public static final String FIELD_NAME_TYPE = "type";
+ static final String FIELD_NAME_NAME = "name";
+ static final String FIELD_NAME_ARG_LIST = "argList";
+ static final String FIELD_NAME_FILTER_ARG = "filterArg";
+ static final String FIELD_NAME_DISTINCT = "distinct";
+ static final String FIELD_NAME_APPROXIMATE = "approximate";
+ static final String FIELD_NAME_IGNORE_NULLS = "ignoreNulls";
+ static final String FIELD_NAME_TYPE = "type";
- public AggregateCallJsonSerializer() {
+ AggregateCallJsonSerializer() {
super(AggregateCall.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
index b3c7351..e6590a7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
@@ -18,11 +18,11 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
@@ -30,20 +30,22 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
import java.io.IOException;
/**
- * JSON deserializer for {@link ChangelogMode}. refer to {@link ChangelogModeJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link ChangelogMode}.
+ *
+ * @see ChangelogModeJsonSerializer for the reverse operation
*/
-public class ChangelogModeJsonDeserializer extends StdDeserializer<ChangelogMode> {
+@Internal
+final class ChangelogModeJsonDeserializer extends StdDeserializer<ChangelogMode> {
private static final long serialVersionUID = 1L;
- public ChangelogModeJsonDeserializer() {
+ ChangelogModeJsonDeserializer() {
super(ChangelogMode.class);
}
@Override
public ChangelogMode deserialize(
JsonParser jsonParser, DeserializationContext deserializationContext)
- throws IOException, JsonProcessingException {
+ throws IOException {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
JsonNode rowKindsNode = jsonParser.readValueAsTree();
for (JsonNode rowKindNode : rowKindsNode) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
index dbd5a4a..8d0c462 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;
@@ -28,13 +29,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
/**
- * JSON serializer for {@link ChangelogMode}. refer to {@link ChangelogModeJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link ChangelogMode}.
+ *
+ * @see ChangelogModeJsonDeserializer for the reverse operation
*/
-public class ChangelogModeJsonSerializer extends StdSerializer<ChangelogMode> {
+@Internal
+final class ChangelogModeJsonSerializer extends StdSerializer<ChangelogMode> {
private static final long serialVersionUID = 1L;
- public ChangelogModeJsonSerializer() {
+ ChangelogModeJsonSerializer() {
super(ChangelogMode.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
index 15ba34a..6cbce5d3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.expressions.ResolvedExpression;
@@ -47,12 +48,18 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSer
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.deserializeOptionalField;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
-class ColumnJsonDeserializer extends StdDeserializer<Column> {
+/**
+ * JSON deserializer for {@link Column}.
+ *
+ * @see ColumnJsonSerializer for the reverse operation
+ */
+@Internal
+final class ColumnJsonDeserializer extends StdDeserializer<Column> {
private static final String SUPPORTED_KINDS =
Arrays.toString(new String[] {KIND_PHYSICAL, KIND_COMPUTED, KIND_METADATA});
- public ColumnJsonDeserializer() {
+ ColumnJsonDeserializer() {
super(Column.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
index 3d6ded0..4721512 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -28,20 +29,26 @@ import java.io.IOException;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
-class ColumnJsonSerializer extends StdSerializer<Column> {
+/**
+ * JSON serializer for {@link Column}.
+ *
+ * @see ColumnJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ColumnJsonSerializer extends StdSerializer<Column> {
- public static final String KIND = "kind";
- public static final String KIND_PHYSICAL = "PHYSICAL";
- public static final String KIND_COMPUTED = "COMPUTED";
- public static final String KIND_METADATA = "METADATA";
- public static final String NAME = "name";
- public static final String DATA_TYPE = "dataType";
- public static final String COMMENT = "comment";
- public static final String EXPRESSION = "expression";
- public static final String METADATA_KEY = "metadataKey";
- public static final String IS_VIRTUAL = "isVirtual";
+ static final String KIND = "kind";
+ static final String KIND_PHYSICAL = "PHYSICAL";
+ static final String KIND_COMPUTED = "COMPUTED";
+ static final String KIND_METADATA = "METADATA";
+ static final String NAME = "name";
+ static final String DATA_TYPE = "dataType";
+ static final String COMMENT = "comment";
+ static final String EXPRESSION = "expression";
+ static final String METADATA_KEY = "metadataKey";
+ static final String IS_VIRTUAL = "isVirtual";
- public ColumnJsonSerializer() {
+ ColumnJsonSerializer() {
super(Column.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
index c76091d..0828840 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
@@ -45,10 +46,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolv
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_IDENTIFIER;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.OPTIONS;
-class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
+/**
+ * JSON deserializer for {@link ContextResolvedTable}.
+ *
+ * @see ContextResolvedTableJsonSerializer for the reverse operation
+ */
+@Internal
+final class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
private static final long serialVersionUID = 1L;
- public ContextResolvedTableJsonDeserializer() {
+ ContextResolvedTableJsonDeserializer() {
super(ContextResolvedTable.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
index 0b5d98e..5101198 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
@@ -30,14 +31,19 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
-/** JSON serializer for {@link ContextResolvedTable}. */
-class ContextResolvedTableJsonSerializer extends StdSerializer<ContextResolvedTable> {
+/**
+ * JSON serializer for {@link ContextResolvedTable}.
+ *
+ * @see ContextResolvedTableJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ContextResolvedTableJsonSerializer extends StdSerializer<ContextResolvedTable> {
private static final long serialVersionUID = 1L;
- public static final String FIELD_NAME_IDENTIFIER = "identifier";
- public static final String FIELD_NAME_CATALOG_TABLE = "resolvedTable";
+ static final String FIELD_NAME_IDENTIFIER = "identifier";
+ static final String FIELD_NAME_CATALOG_TABLE = "resolvedTable";
- public ContextResolvedTableJsonSerializer() {
+ ContextResolvedTableJsonSerializer() {
super(ContextResolvedTable.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
index 3a234b2..21f456e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
@@ -59,9 +59,9 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil
* @see DataTypeJsonSerializer for the reverse operation
*/
@Internal
-public class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
+final class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
- public DataTypeJsonDeserializer() {
+ DataTypeJsonDeserializer() {
super(DataType.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
index 53938a8..8593493 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
@@ -41,7 +41,7 @@ import static org.apache.flink.table.types.utils.DataTypeUtils.isInternal;
* @see DataTypeJsonDeserializer for the reverse operation
*/
@Internal
-public final class DataTypeJsonSerializer extends StdSerializer<DataType> {
+final class DataTypeJsonSerializer extends StdSerializer<DataType> {
private static final long serialVersionUID = 1L;
/*
@@ -83,22 +83,22 @@ public final class DataTypeJsonSerializer extends StdSerializer<DataType> {
*/
// Common fields
- public static final String FIELD_NAME_TYPE = "logicalType";
- public static final String FIELD_NAME_CONVERSION_CLASS = "conversionClass";
+ static final String FIELD_NAME_TYPE = "logicalType";
+ static final String FIELD_NAME_CONVERSION_CLASS = "conversionClass";
// ARRAY, MULTISET
- public static final String FIELD_NAME_ELEMENT_CLASS = "elementClass";
+ static final String FIELD_NAME_ELEMENT_CLASS = "elementClass";
// MAP
- public static final String FIELD_NAME_KEY_CLASS = "keyClass";
- public static final String FIELD_NAME_VALUE_CLASS = "valueClass";
+ static final String FIELD_NAME_KEY_CLASS = "keyClass";
+ static final String FIELD_NAME_VALUE_CLASS = "valueClass";
// ROW, STRUCTURED_TYPE, DISTINCT_TYPE
- public static final String FIELD_NAME_FIELDS = "fields";
- public static final String FIELD_NAME_FIELD_NAME = "name";
- public static final String FIELD_NAME_FIELD_CLASS = "fieldClass";
+ static final String FIELD_NAME_FIELDS = "fields";
+ static final String FIELD_NAME_FIELD_NAME = "name";
+ static final String FIELD_NAME_FIELD_CLASS = "fieldClass";
- public DataTypeJsonSerializer() {
+ DataTypeJsonSerializer() {
super(DataType.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
index 4fef3bb..f353012 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,13 +28,17 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
import java.io.IOException;
/**
- * JSON deserializer for {@link ExecNodeGraph}. It uses the intermediate representation {@link
- * JsonPlanGraph}.
+ * JSON deserializer for {@link ExecNodeGraph}.
+ *
+ * <p>It uses the intermediate representation {@link JsonPlanGraph}.
+ *
+ * @see ExecNodeGraphJsonSerializer for the reverse operation
*/
-public class ExecNodeGraphJsonDeserializer extends StdDeserializer<ExecNodeGraph> {
+@Internal
+final class ExecNodeGraphJsonDeserializer extends StdDeserializer<ExecNodeGraph> {
private static final long serialVersionUID = 1L;
- public ExecNodeGraphJsonDeserializer() {
+ ExecNodeGraphJsonDeserializer() {
super(ExecNodeGraph.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
index c424f91..752f482 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphValidator;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
@@ -29,13 +30,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
/**
- * JSON serializer for {@link ExecNodeGraph}. It uses the intermediate representation {@link
- * JsonPlanGraph}.
+ * JSON serializer for {@link ExecNodeGraph}.
+ *
+ * @see ExecNodeGraphJsonDeserializer for the reverse operation
*/
-public class ExecNodeGraphJsonSerializer extends StdSerializer<ExecNodeGraph> {
+@Internal
+final class ExecNodeGraphJsonSerializer extends StdSerializer<ExecNodeGraph> {
private static final long serialVersionUID = 1L;
- public ExecNodeGraphJsonSerializer() {
+ ExecNodeGraphJsonSerializer() {
super(ExecNodeGraph.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
index 968b93c..7f2fb0a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,11 +28,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
import java.io.IOException;
-/** JSON deserializer for {@link FlinkVersion}. */
-public class FlinkVersionJsonDeserializer extends StdDeserializer<FlinkVersion> {
+/**
+ * JSON deserializer for {@link FlinkVersion}.
+ *
+ * @see FlinkVersionJsonSerializer for the reverse operation
+ */
+@Internal
+final class FlinkVersionJsonDeserializer extends StdDeserializer<FlinkVersion> {
private static final long serialVersionUID = 1L;
- public FlinkVersionJsonDeserializer() {
+ FlinkVersionJsonDeserializer() {
super(FlinkVersion.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
index 3420314..b43c578 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
@@ -26,11 +27,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
-/** JSON serializer for {@link FlinkVersion}. */
-public class FlinkVersionJsonSerializer extends StdSerializer<FlinkVersion> {
+/**
+ * JSON serializer for {@link FlinkVersion}.
+ *
+ * @see FlinkVersionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class FlinkVersionJsonSerializer extends StdSerializer<FlinkVersion> {
private static final long serialVersionUID = 1L;
- public FlinkVersionJsonSerializer() {
+ FlinkVersionJsonSerializer() {
super(FlinkVersion.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
index 17c5c5e..f5401b6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -36,12 +37,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotatio
*
* <p>This model is used only during serialization/deserialization.
*/
+@Internal
@JsonIgnoreProperties(ignoreUnknown = true)
-class JsonPlanEdge {
- public static final String FIELD_NAME_SOURCE = "source";
- public static final String FIELD_NAME_TARGET = "target";
- public static final String FIELD_NAME_SHUFFLE = "shuffle";
- public static final String FIELD_NAME_SHUFFLE_MODE = "shuffleMode";
+final class JsonPlanEdge {
+ static final String FIELD_NAME_SOURCE = "source";
+ static final String FIELD_NAME_TARGET = "target";
+ static final String FIELD_NAME_SHUFFLE = "shuffle";
+ static final String FIELD_NAME_SHUFFLE_MODE = "shuffleMode";
/** The source node id of this edge. */
@JsonProperty(FIELD_NAME_SOURCE)
@@ -59,7 +61,7 @@ class JsonPlanEdge {
private final StreamExchangeMode exchangeMode;
@JsonCreator
- public JsonPlanEdge(
+ JsonPlanEdge(
@JsonProperty(FIELD_NAME_SOURCE) int sourceId,
@JsonProperty(FIELD_NAME_TARGET) int targetId,
@JsonProperty(FIELD_NAME_SHUFFLE) ExecEdge.Shuffle shuffle,
@@ -70,24 +72,24 @@ class JsonPlanEdge {
this.exchangeMode = exchangeMode;
}
- public int getSourceId() {
+ int getSourceId() {
return sourceId;
}
- public int getTargetId() {
+ int getTargetId() {
return targetId;
}
- public ExecEdge.Shuffle getShuffle() {
+ ExecEdge.Shuffle getShuffle() {
return shuffle;
}
- public StreamExchangeMode getExchangeMode() {
+ StreamExchangeMode getExchangeMode() {
return exchangeMode;
}
/** Build {@link JsonPlanEdge} from an {@link ExecEdge}. */
- public static JsonPlanEdge fromExecEdge(ExecEdge execEdge) {
+ static JsonPlanEdge fromExecEdge(ExecEdge execEdge) {
return new JsonPlanEdge(
execEdge.getSource().getId(),
execEdge.getTarget().getId(),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
index 9e6278c..4dd4a93 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -48,10 +49,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*
* <p>This model is used only during serialization/deserialization.
*/
-class JsonPlanGraph {
- public static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
- public static final String FIELD_NAME_NODES = "nodes";
- public static final String FIELD_NAME_EDGES = "edges";
+@Internal
+final class JsonPlanGraph {
+ static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
+ static final String FIELD_NAME_NODES = "nodes";
+ static final String FIELD_NAME_EDGES = "edges";
@JsonProperty(FIELD_NAME_FLINK_VERSION)
private final FlinkVersion flinkVersion;
@@ -63,7 +65,7 @@ class JsonPlanGraph {
private final List<JsonPlanEdge> edges;
@JsonCreator
- public JsonPlanGraph(
+ JsonPlanGraph(
@JsonProperty(FIELD_NAME_FLINK_VERSION) FlinkVersion flinkVersion,
@JsonProperty(FIELD_NAME_NODES) List<ExecNode<?>> nodes,
@JsonProperty(FIELD_NAME_EDGES) List<JsonPlanEdge> edges) {
@@ -72,7 +74,7 @@ class JsonPlanGraph {
this.edges = edges;
}
- public static JsonPlanGraph fromExecNodeGraph(ExecNodeGraph execGraph) {
+ static JsonPlanGraph fromExecNodeGraph(ExecNodeGraph execGraph) {
final List<ExecNode<?>> allNodes = new ArrayList<>();
final List<JsonPlanEdge> allEdges = new ArrayList<>();
final Set<Integer> nodesIds = new HashSet<>();
@@ -112,7 +114,7 @@ class JsonPlanGraph {
return new JsonPlanGraph(execGraph.getFlinkVersion(), allNodes, allEdges);
}
- public ExecNodeGraph convertToExecNodeGraph() {
+ ExecNodeGraph convertToExecNodeGraph() {
Map<Integer, ExecNode<?>> idToExecNodes = new HashMap<>();
for (ExecNode<?> execNode : nodes) {
int id = execNode.getId();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 7e25612..92bec58 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ContextResolvedTable;
@@ -31,11 +32,13 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution;
import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
import org.apache.flink.table.runtime.groupwindow.WindowReference;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.extraction.ExtractionUtils;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -69,7 +72,8 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.util.Optional;
-/** An utility class that provide abilities for JSON serialization and deserialization. */
+/** A utility class that provide abilities for JSON serialization and deserialization. */
+@Internal
public class JsonSerdeUtil {
/** Return true if the given class's constructors have @JsonCreator annotation, else false. */
@@ -124,7 +128,7 @@ public class JsonSerdeUtil {
private static Module createFlinkTableJacksonModule() {
final SimpleModule module = new SimpleModule("Flink table module");
- ExecNodeMetadataUtil.execNodes().stream()
+ ExecNodeMetadataUtil.execNodes()
.forEach(c -> module.registerSubtypes(new NamedType(c, c.getName())));
registerSerializers(module);
registerDeserializers(module);
@@ -136,15 +140,10 @@ public class JsonSerdeUtil {
private static void registerSerializers(SimpleModule module) {
module.addSerializer(new ExecNodeGraphJsonSerializer());
module.addSerializer(new FlinkVersionJsonSerializer());
- // ObjectIdentifierJsonSerializer is needed for LogicalType serialization
module.addSerializer(new ObjectIdentifierJsonSerializer());
- // LogicalTypeJsonSerializer is needed for RelDataType serialization
module.addSerializer(new LogicalTypeJsonSerializer());
- // DataTypeJsonSerializer is needed for LogicalType serialization
module.addSerializer(new DataTypeJsonSerializer());
- // RelDataTypeJsonSerializer is needed for RexNode serialization
module.addSerializer(new RelDataTypeJsonSerializer());
- // RexNode is used in many exec nodes, so we register its serializer directly here
module.addSerializer(new RexNodeJsonSerializer());
module.addSerializer(new AggregateCallJsonSerializer());
module.addSerializer(new ChangelogModeJsonSerializer());
@@ -156,24 +155,19 @@ public class JsonSerdeUtil {
module.addSerializer(new ResolvedCatalogTableJsonSerializer());
module.addSerializer(new ResolvedExpressionJsonSerializer());
module.addSerializer(new ResolvedSchemaJsonSerializer());
+ module.addSerializer(new RequiredDistributionJsonSerializer());
}
@SuppressWarnings({"unchecked", "rawtypes"})
private static void registerDeserializers(SimpleModule module) {
module.addDeserializer(ExecNodeGraph.class, new ExecNodeGraphJsonDeserializer());
module.addDeserializer(FlinkVersion.class, new FlinkVersionJsonDeserializer());
- // ObjectIdentifierJsonDeserializer is needed for LogicalType deserialization
module.addDeserializer(ObjectIdentifier.class, new ObjectIdentifierJsonDeserializer());
- // LogicalTypeJsonSerializer is needed for RelDataType serialization
module.addDeserializer(LogicalType.class, new LogicalTypeJsonDeserializer());
- // DataTypeJsonDeserializer is needed for LogicalType serialization
+ module.addDeserializer(RowType.class, (StdDeserializer) new LogicalTypeJsonDeserializer());
module.addDeserializer(DataType.class, new DataTypeJsonDeserializer());
- // RelDataTypeJsonSerializer is needed for RexNode serialization
module.addDeserializer(RelDataType.class, new RelDataTypeJsonDeserializer());
- // RexNode is used in many exec nodes, so we register its deserializer directly here
module.addDeserializer(RexNode.class, new RexNodeJsonDeserializer());
- // We need this explicit mapping to make sure Jackson can deserialize POJOs declaring fields
- // with RexLiteral instead of RexNode.
module.addDeserializer(RexLiteral.class, (StdDeserializer) new RexNodeJsonDeserializer());
module.addDeserializer(AggregateCall.class, new AggregateCallJsonDeserializer());
module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer());
@@ -187,6 +181,8 @@ public class JsonSerdeUtil {
ResolvedCatalogTable.class, new ResolvedCatalogTableJsonDeserializer());
module.addDeserializer(ResolvedExpression.class, new ResolvedExpressionJsonDeserializer());
module.addDeserializer(ResolvedSchema.class, new ResolvedSchemaJsonDeserializer());
+ module.addDeserializer(
+ RequiredDistribution.class, new RequiredDistributionJsonDeserializer());
}
private static void registerMixins(SimpleModule module) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
index df48ca4..99ee1e9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
@@ -97,10 +97,10 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJs
* @see LogicalTypeJsonSerializer for the reverse operation
*/
@Internal
-public class LogicalTypeJsonDeserializer extends StdDeserializer<LogicalType> {
+final class LogicalTypeJsonDeserializer extends StdDeserializer<LogicalType> {
private static final long serialVersionUID = 1L;
- public LogicalTypeJsonDeserializer() {
+ LogicalTypeJsonDeserializer() {
super(LogicalType.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
index 8507984..390ef6e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
@@ -64,56 +64,56 @@ import java.io.IOException;
* @see LogicalTypeJsonDeserializer for the reverse operation.
*/
@Internal
-public final class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
+final class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
private static final long serialVersionUID = 1L;
// Common fields
- public static final String FIELD_NAME_TYPE_NAME = "type";
- public static final String FIELD_NAME_NULLABLE = "nullable";
- public static final String FIELD_NAME_DESCRIPTION = "description";
+ static final String FIELD_NAME_TYPE_NAME = "type";
+ static final String FIELD_NAME_NULLABLE = "nullable";
+ static final String FIELD_NAME_DESCRIPTION = "description";
// CHAR, VARCHAR, BINARY, VARBINARY
- public static final String FIELD_NAME_LENGTH = "length";
+ static final String FIELD_NAME_LENGTH = "length";
// TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE
- public static final String FIELD_NAME_PRECISION = "precision";
- public static final String FIELD_NAME_TIMESTAMP_KIND = "kind";
+ static final String FIELD_NAME_PRECISION = "precision";
+ static final String FIELD_NAME_TIMESTAMP_KIND = "kind";
// ARRAY, MULTISET
- public static final String FIELD_NAME_ELEMENT_TYPE = "elementType";
+ static final String FIELD_NAME_ELEMENT_TYPE = "elementType";
// MAP
- public static final String FIELD_NAME_KEY_TYPE = "keyType";
- public static final String FIELD_NAME_VALUE_TYPE = "valueType";
+ static final String FIELD_NAME_KEY_TYPE = "keyType";
+ static final String FIELD_NAME_VALUE_TYPE = "valueType";
// ROW
- public static final String FIELD_NAME_FIELDS = "fields";
- public static final String FIELD_NAME_FIELD_NAME = "name";
- public static final String FIELD_NAME_FIELD_TYPE = "fieldType";
- public static final String FIELD_NAME_FIELD_DESCRIPTION = "description";
+ static final String FIELD_NAME_FIELDS = "fields";
+ static final String FIELD_NAME_FIELD_NAME = "name";
+ static final String FIELD_NAME_FIELD_TYPE = "fieldType";
+ static final String FIELD_NAME_FIELD_DESCRIPTION = "description";
// DISTINCT_TYPE
- public static final String FIELD_NAME_SOURCE_TYPE = "sourceType";
+ static final String FIELD_NAME_SOURCE_TYPE = "sourceType";
// STRUCTURED_TYPE
- public static final String FIELD_NAME_OBJECT_IDENTIFIER = "objectIdentifier";
- public static final String FIELD_NAME_IMPLEMENTATION_CLASS = "implementationClass";
- public static final String FIELD_NAME_ATTRIBUTES = "attributes";
- public static final String FIELD_NAME_ATTRIBUTE_NAME = "name";
- public static final String FIELD_NAME_ATTRIBUTE_TYPE = "attributeType";
- public static final String FIELD_NAME_ATTRIBUTE_DESCRIPTION = "description";
- public static final String FIELD_NAME_FINAL = "final";
- public static final String FIELD_NAME_INSTANTIABLE = "instantiable";
- public static final String FIELD_NAME_COMPARISON = "comparison";
- public static final String FIELD_NAME_SUPER_TYPE = "superType";
+ static final String FIELD_NAME_OBJECT_IDENTIFIER = "objectIdentifier";
+ static final String FIELD_NAME_IMPLEMENTATION_CLASS = "implementationClass";
+ static final String FIELD_NAME_ATTRIBUTES = "attributes";
+ static final String FIELD_NAME_ATTRIBUTE_NAME = "name";
+ static final String FIELD_NAME_ATTRIBUTE_TYPE = "attributeType";
+ static final String FIELD_NAME_ATTRIBUTE_DESCRIPTION = "description";
+ static final String FIELD_NAME_FINAL = "final";
+ static final String FIELD_NAME_INSTANTIABLE = "instantiable";
+ static final String FIELD_NAME_COMPARISON = "comparison";
+ static final String FIELD_NAME_SUPER_TYPE = "superType";
// RAW
- public static final String FIELD_NAME_CLASS = "class";
- public static final String FIELD_NAME_EXTERNAL_DATA_TYPE = "externalDataType";
- public static final String FIELD_NAME_SPECIAL_SERIALIZER = "specialSerializer";
- public static final String FIELD_VALUE_EXTERNAL_SERIALIZER_NULL = "NULL";
+ static final String FIELD_NAME_CLASS = "class";
+ static final String FIELD_NAME_EXTERNAL_DATA_TYPE = "externalDataType";
+ static final String FIELD_NAME_SPECIAL_SERIALIZER = "specialSerializer";
+ static final String FIELD_VALUE_EXTERNAL_SERIALIZER_NULL = "NULL";
- public LogicalTypeJsonSerializer() {
+ LogicalTypeJsonSerializer() {
super(LogicalType.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
index d6d9151..3a0282b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -54,13 +55,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindow
import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.KIND_TUMBLING;
/**
- * JSON deserializer for {@link LogicalWindow}, refer to {@link LogicalWindowJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link LogicalWindow}.
+ *
+ * @see LogicalWindowJsonSerializer for the reverse operation
*/
-public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow> {
+@Internal
+final class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow> {
private static final long serialVersionUID = 1L;
- public LogicalWindowJsonDeserializer() {
+ LogicalWindowJsonDeserializer() {
super(LogicalWindow.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
index 2195b5f..518e7d2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -39,31 +40,33 @@ import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toDuration
import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toLong;
/**
- * JSON serializer for {@link LogicalWindow}, refer to {@link LogicalWindowJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link LogicalWindow}.
+ *
+ * @see LogicalTypeJsonDeserializer for the reverse operation
*/
-public class LogicalWindowJsonSerializer extends StdSerializer<LogicalWindow> {
+@Internal
+final class LogicalWindowJsonSerializer extends StdSerializer<LogicalWindow> {
private static final long serialVersionUID = 1L;
- public static final String FIELD_NAME_KIND = "kind";
- public static final String KIND_TUMBLING = "TUMBLING";
- public static final String KIND_SLIDING = "SLIDING";
- public static final String KIND_SESSION = "SESSION";
+ static final String FIELD_NAME_KIND = "kind";
+ static final String KIND_TUMBLING = "TUMBLING";
+ static final String KIND_SLIDING = "SLIDING";
+ static final String KIND_SESSION = "SESSION";
- public static final String FIELD_NAME_ALIAS = "alias";
- public static final String FIELD_NAME_TIME_FIELD = "timeField";
- public static final String FIELD_NAME_FIELD_NAME = "fieldName";
- public static final String FIELD_NAME_FIELD_INDEX = "fieldIndex";
- public static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
- public static final String FIELD_NAME_FIELD_TYPE = "fieldType";
+ static final String FIELD_NAME_ALIAS = "alias";
+ static final String FIELD_NAME_TIME_FIELD = "timeField";
+ static final String FIELD_NAME_FIELD_NAME = "fieldName";
+ static final String FIELD_NAME_FIELD_INDEX = "fieldIndex";
+ static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
+ static final String FIELD_NAME_FIELD_TYPE = "fieldType";
- public static final String FIELD_NAME_SIZE = "size";
- public static final String FIELD_NAME_IS_TIME_WINDOW = "isTimeWindow";
+ static final String FIELD_NAME_SIZE = "size";
+ static final String FIELD_NAME_IS_TIME_WINDOW = "isTimeWindow";
- public static final String FIELD_NAME_SLIDE = "slide";
- public static final String FIELD_NAME_GAP = "gap";
+ static final String FIELD_NAME_SLIDE = "slide";
+ static final String FIELD_NAME_GAP = "gap";
- public LogicalWindowJsonSerializer() {
+ LogicalWindowJsonSerializer() {
super(LogicalWindow.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
index 0492d44..9285483 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
@@ -28,11 +29,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
import java.io.IOException;
-/** JSON deserializer for {@link ObjectIdentifier}. */
-public class ObjectIdentifierJsonDeserializer extends StdDeserializer<ObjectIdentifier> {
+/**
+ * JSON deserializer for {@link ObjectIdentifier}.
+ *
+ * @see ObjectIdentifierJsonSerializer for the reverse operation
+ */
+@Internal
+final class ObjectIdentifierJsonDeserializer extends StdDeserializer<ObjectIdentifier> {
private static final long serialVersionUID = 1L;
- public ObjectIdentifierJsonDeserializer() {
+ ObjectIdentifierJsonDeserializer() {
super(ObjectIdentifier.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
index d14e37d..fb23dcd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -26,11 +27,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
-/** JSON serializer for {@link ObjectIdentifier}. */
-public class ObjectIdentifierJsonSerializer extends StdSerializer<ObjectIdentifier> {
+/**
+ * JSON serializer for {@link ObjectIdentifier}.
+ *
+ * @see ObjectIdentifierJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ObjectIdentifierJsonSerializer extends StdSerializer<ObjectIdentifier> {
private static final long serialVersionUID = 1L;
- public ObjectIdentifierJsonSerializer() {
+ ObjectIdentifierJsonSerializer() {
super(ObjectIdentifier.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
index b1c8cd0..467fc9e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
@@ -38,10 +38,10 @@ import java.io.IOException;
* @see RelDataTypeJsonSerializer for the reverse operation
*/
@Internal
-public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
+final class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
private static final long serialVersionUID = 1L;
- public RelDataTypeJsonDeserializer() {
+ RelDataTypeJsonDeserializer() {
super(RelDataType.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
index 5d76abb..da86cc5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
@@ -37,10 +37,10 @@ import java.io.IOException;
* @see RelDataTypeJsonDeserializer for the reverse operation.
*/
@Internal
-public final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
+final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
private static final long serialVersionUID = 1L;
- public RelDataTypeJsonSerializer() {
+ RelDataTypeJsonSerializer() {
super(RelDataType.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
index 24db15a..cbbb7a6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
@@ -18,30 +18,35 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import java.io.IOException;
-/** JSON deserializer for {@link RequiredDistribution}. */
-public class RequiredDistributionJsonDeserializer extends StdDeserializer<RequiredDistribution> {
+/**
+ * JSON deserializer for {@link RequiredDistribution}.
+ *
+ * @see RelDataTypeJsonSerializer for the reverse operation
+ */
+@Internal
+final class RequiredDistributionJsonDeserializer extends StdDeserializer<RequiredDistribution> {
private static final long serialVersionUID = 1L;
- public RequiredDistributionJsonDeserializer() {
+ RequiredDistributionJsonDeserializer() {
super(RequiredDistribution.class);
}
@Override
public RequiredDistribution deserialize(JsonParser jsonParser, DeserializationContext ctx)
- throws IOException, JsonProcessingException {
+ throws IOException {
JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser);
DistributionType type =
DistributionType.valueOf(jsonNode.get("type").asText().toUpperCase());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
index df47b80..1817557 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.HashDistribution;
@@ -29,11 +30,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
-/** JSON serializer for {@link RequiredDistribution}. */
-public class RequiredDistributionJsonSerializer extends StdSerializer<RequiredDistribution> {
+/**
+ * JSON serializer for {@link RequiredDistribution}.
+ *
+ * @see RequiredDistributionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class RequiredDistributionJsonSerializer extends StdSerializer<RequiredDistribution> {
private static final long serialVersionUID = 1L;
- public RequiredDistributionJsonSerializer() {
+ RequiredDistributionJsonSerializer() {
super(RequiredDistribution.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
index b19a618..86ce77e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -40,10 +41,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatal
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.PARTITION_KEYS;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.RESOLVED_SCHEMA;
-class ResolvedCatalogTableJsonDeserializer extends StdDeserializer<ResolvedCatalogTable> {
+/**
+ * JSON deserializer for {@link ResolvedCatalogTable}.
+ *
+ * @see ResolvedCatalogTableJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedCatalogTableJsonDeserializer extends StdDeserializer<ResolvedCatalogTable> {
private static final long serialVersionUID = 1L;
- public ResolvedCatalogTableJsonDeserializer() {
+ ResolvedCatalogTableJsonDeserializer() {
super(ResolvedCatalogTable.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
index c088f57..25d9c2f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
@@ -31,17 +32,23 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
-class ResolvedCatalogTableJsonSerializer extends StdSerializer<ResolvedCatalogTable> {
+/**
+ * JSON serializer for {@link ResolvedCatalogTable}.
+ *
+ * @see ResolvedCatalogTableJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedCatalogTableJsonSerializer extends StdSerializer<ResolvedCatalogTable> {
private static final long serialVersionUID = 1L;
static final String SERIALIZE_OPTIONS = "serialize_options";
- public static final String RESOLVED_SCHEMA = "schema";
- public static final String PARTITION_KEYS = "partitionKeys";
- public static final String OPTIONS = "options";
- public static final String COMMENT = "comment";
+ static final String RESOLVED_SCHEMA = "schema";
+ static final String PARTITION_KEYS = "partitionKeys";
+ static final String OPTIONS = "options";
+ static final String COMMENT = "comment";
- public ResolvedCatalogTableJsonSerializer() {
+ ResolvedCatalogTableJsonSerializer() {
super(ResolvedCatalogTable.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
index f513ea9..2d6c1ee 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.ResolvedExpression;
@@ -41,9 +42,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpre
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpressionJsonSerializer.TYPE;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpressionJsonSerializer.TYPE_REX_NODE_EXPRESSION;
-class ResolvedExpressionJsonDeserializer extends StdDeserializer<ResolvedExpression> {
+/**
+ * JSON deserializer for {@link ResolvedExpression}.
+ *
+ * @see ResolvedExpressionJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedExpressionJsonDeserializer extends StdDeserializer<ResolvedExpression> {
- protected ResolvedExpressionJsonDeserializer() {
+ ResolvedExpressionJsonDeserializer() {
super(ResolvedExpression.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
index 9dfa27d..b1c2009 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.planner.expressions.RexNodeExpression;
@@ -28,14 +29,20 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
-class ResolvedExpressionJsonSerializer extends StdSerializer<ResolvedExpression> {
+/**
+ * JSON serializer for {@link ResolvedExpression}.
+ *
+ * @see ResolvedExpressionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedExpressionJsonSerializer extends StdSerializer<ResolvedExpression> {
- public static final String TYPE = "type";
- public static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
- public static final String REX_NODE = "rexNode";
- public static final String SERIALIZABLE_STRING = "serializableString";
+ static final String TYPE = "type";
+ static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
+ static final String REX_NODE = "rexNode";
+ static final String SERIALIZABLE_STRING = "serializableString";
- protected ResolvedExpressionJsonSerializer() {
+ ResolvedExpressionJsonSerializer() {
super(ResolvedExpression.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
index 94e20ad..630ce93 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
@@ -37,10 +38,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchem
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.PRIMARY_KEY;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.WATERMARK_SPECS;
-class ResolvedSchemaJsonDeserializer extends StdDeserializer<ResolvedSchema> {
+/**
+ * JSON deserializer for {@link ResolvedSchema}.
+ *
+ * @see ResolvedSchemaJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedSchemaJsonDeserializer extends StdDeserializer<ResolvedSchema> {
private static final long serialVersionUID = 1L;
- public ResolvedSchemaJsonDeserializer() {
+ ResolvedSchemaJsonDeserializer() {
super(ResolvedSchema.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
index da8bb8b..393d5ab 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -28,14 +29,20 @@ import java.io.IOException;
import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
-class ResolvedSchemaJsonSerializer extends StdSerializer<ResolvedSchema> {
+/**
+ * JSON serializer for {@link ResolvedSchema}.
+ *
+ * @see ResolvedSchemaJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedSchemaJsonSerializer extends StdSerializer<ResolvedSchema> {
private static final long serialVersionUID = 1L;
- public static final String COLUMNS = "columns";
- public static final String WATERMARK_SPECS = "watermarkSpecs";
- public static final String PRIMARY_KEY = "primaryKey";
+ static final String COLUMNS = "columns";
+ static final String WATERMARK_SPECS = "watermarkSpecs";
+ static final String PRIMARY_KEY = "primaryKey";
- public ResolvedSchemaJsonSerializer() {
+ ResolvedSchemaJsonSerializer() {
super(ResolvedSchema.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index ff9e322..8e52495 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -110,10 +110,10 @@ import static org.apache.flink.table.planner.typeutils.SymbolUtil.serializableTo
* @see RexNodeJsonSerializer for the reverse operation
*/
@Internal
-public class RexNodeJsonDeserializer extends StdDeserializer<RexNode> {
+final class RexNodeJsonDeserializer extends StdDeserializer<RexNode> {
private static final long serialVersionUID = 1L;
- public RexNodeJsonDeserializer() {
+ RexNodeJsonDeserializer() {
super(RexNode.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index d38f08a..7e38317 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -72,53 +72,53 @@ import static org.apache.flink.table.planner.typeutils.SymbolUtil.calciteToSeria
* @see RexNodeJsonDeserializer for the reverse operation
*/
@Internal
-public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
+final class RexNodeJsonSerializer extends StdSerializer<RexNode> {
private static final long serialVersionUID = 1L;
// Common fields
- public static final String FIELD_NAME_KIND = "kind";
- public static final String FIELD_NAME_VALUE = "value";
- public static final String FIELD_NAME_TYPE = "type";
- public static final String FIELD_NAME_NAME = "name";
+ static final String FIELD_NAME_KIND = "kind";
+ static final String FIELD_NAME_VALUE = "value";
+ static final String FIELD_NAME_TYPE = "type";
+ static final String FIELD_NAME_NAME = "name";
// INPUT_REF
- public static final String KIND_INPUT_REF = "INPUT_REF";
- public static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
+ static final String KIND_INPUT_REF = "INPUT_REF";
+ static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
// LITERAL
- public static final String KIND_LITERAL = "LITERAL";
+ static final String KIND_LITERAL = "LITERAL";
// Sarg fields and values
- public static final String FIELD_NAME_SARG = "sarg";
- public static final String FIELD_NAME_RANGES = "ranges";
- public static final String FIELD_NAME_BOUND_LOWER = "lower";
- public static final String FIELD_NAME_BOUND_UPPER = "upper";
- public static final String FIELD_NAME_BOUND_TYPE = "boundType";
- public static final String FIELD_NAME_CONTAINS_NULL = "containsNull";
+ static final String FIELD_NAME_SARG = "sarg";
+ static final String FIELD_NAME_RANGES = "ranges";
+ static final String FIELD_NAME_BOUND_LOWER = "lower";
+ static final String FIELD_NAME_BOUND_UPPER = "upper";
+ static final String FIELD_NAME_BOUND_TYPE = "boundType";
+ static final String FIELD_NAME_CONTAINS_NULL = "containsNull";
// Symbol fields
- public static final String FIELD_NAME_SYMBOL = "symbol";
+ static final String FIELD_NAME_SYMBOL = "symbol";
// FIELD_ACCESS
- public static final String KIND_FIELD_ACCESS = "FIELD_ACCESS";
- public static final String FIELD_NAME_EXPR = "expr";
+ static final String KIND_FIELD_ACCESS = "FIELD_ACCESS";
+ static final String FIELD_NAME_EXPR = "expr";
// CORREL_VARIABLE
- public static final String KIND_CORREL_VARIABLE = "CORREL_VARIABLE";
- public static final String FIELD_NAME_CORREL = "correl";
+ static final String KIND_CORREL_VARIABLE = "CORREL_VARIABLE";
+ static final String FIELD_NAME_CORREL = "correl";
// PATTERN_INPUT_REF
- public static final String KIND_PATTERN_INPUT_REF = "PATTERN_INPUT_REF";
- public static final String FIELD_NAME_ALPHA = "alpha";
+ static final String KIND_PATTERN_INPUT_REF = "PATTERN_INPUT_REF";
+ static final String FIELD_NAME_ALPHA = "alpha";
// CALL
- public static final String KIND_CALL = "CALL";
- public static final String FIELD_NAME_OPERANDS = "operands";
- public static final String FIELD_NAME_INTERNAL_NAME = "internalName";
- public static final String FIELD_NAME_SYSTEM_NAME = "systemName";
- public static final String FIELD_NAME_CATALOG_NAME = "catalogName";
- public static final String FIELD_NAME_SYNTAX = "syntax";
- public static final String FIELD_NAME_CLASS = "class";
-
- public RexNodeJsonSerializer() {
+ static final String KIND_CALL = "CALL";
+ static final String FIELD_NAME_OPERANDS = "operands";
+ static final String FIELD_NAME_INTERNAL_NAME = "internalName";
+ static final String FIELD_NAME_SYSTEM_NAME = "systemName";
+ static final String FIELD_NAME_CATALOG_NAME = "catalogName";
+ static final String FIELD_NAME_SYNTAX = "syntax";
+ static final String FIELD_NAME_CLASS = "class";
+
+ RexNodeJsonSerializer() {
super(RexNode.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
index 9f1b8e3..a663222 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -41,12 +42,14 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoun
import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonSerializer.KIND_UNBOUNDED_PRECEDING;
/**
- * JSON deserializer for {@link RexWindowBound}. refer to {@link RexWindowBoundJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link RexWindowBound}.
+ *
+ * @see RexWindowBoundJsonSerializer for the reverse operation
*/
-public class RexWindowBoundJsonDeserializer extends StdDeserializer<RexWindowBound> {
+@Internal
+final class RexWindowBoundJsonDeserializer extends StdDeserializer<RexWindowBound> {
- public RexWindowBoundJsonDeserializer() {
+ RexWindowBoundJsonDeserializer() {
super(RexWindowBound.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
index 021731c..658db94 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -29,25 +30,24 @@ import org.apache.calcite.rex.RexWindowBound;
import java.io.IOException;
/**
- * JSON serializer for {@link RexWindowBound}. refer to {@link RexWindowBoundJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link RexWindowBound}.
*
- * <p>Supports serialize CURRENT_ROW, UNBOUNDED_PRECEDING, UNBOUNDED_FOLLOWING, Preceding Bounded
- * Window and Following Bounded Window.
+ * @see RexWindowBoundJsonDeserializer for the reverse operation
*/
-public class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound> {
+@Internal
+final class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound> {
- public static final String FIELD_NAME_KIND = "kind";
- public static final String KIND_CURRENT_ROW = "CURRENT_ROW";
- public static final String KIND_UNBOUNDED_PRECEDING = "UNBOUNDED_PRECEDING";
- public static final String KIND_UNBOUNDED_FOLLOWING = "UNBOUNDED_FOLLOWING";
- public static final String KIND_BOUNDED_WINDOW = "BOUNDED_WINDOW";
+ static final String FIELD_NAME_KIND = "kind";
+ static final String KIND_CURRENT_ROW = "CURRENT_ROW";
+ static final String KIND_UNBOUNDED_PRECEDING = "UNBOUNDED_PRECEDING";
+ static final String KIND_UNBOUNDED_FOLLOWING = "UNBOUNDED_FOLLOWING";
+ static final String KIND_BOUNDED_WINDOW = "BOUNDED_WINDOW";
- public static final String FIELD_NAME_IS_PRECEDING = "isPreceding";
- public static final String FIELD_NAME_IS_FOLLOWING = "isFollowing";
- public static final String FIELD_NAME_OFFSET = "offset";
+ static final String FIELD_NAME_IS_PRECEDING = "isPreceding";
+ static final String FIELD_NAME_IS_FOLLOWING = "isFollowing";
+ static final String FIELD_NAME_OFFSET = "offset";
- public RexWindowBoundJsonSerializer() {
+ RexWindowBoundJsonSerializer() {
super(RexWindowBound.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
index daec5ba..1274132 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.planner.calcite.FlinkContext;
@@ -33,6 +34,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
/**
* A context to allow the store user-defined data within ExecNode serialization and deserialization.
*/
+@Internal
public class SerdeContext {
static final String SERDE_CONTEXT_KEY = "serdeCtx";
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
index 2a61222..80ff766 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
@@ -18,29 +18,34 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.Shuffle;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import java.io.IOException;
-/** JSON deserializer for {@link Shuffle}. */
-public class ShuffleJsonDeserializer extends StdDeserializer<Shuffle> {
+/**
+ * JSON deserializer for {@link Shuffle}.
+ *
+ * @see ShuffleJsonSerializer for the reverse operation
+ */
+@Internal
+final class ShuffleJsonDeserializer extends StdDeserializer<Shuffle> {
private static final long serialVersionUID = 1L;
- public ShuffleJsonDeserializer() {
+ ShuffleJsonDeserializer() {
super(Shuffle.class);
}
@Override
public Shuffle deserialize(JsonParser jsonParser, DeserializationContext ctx)
- throws IOException, JsonProcessingException {
+ throws IOException {
JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser);
Shuffle.Type type = Shuffle.Type.valueOf(jsonNode.get("type").asText().toUpperCase());
switch (type) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
index 1607df4..5603186 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.HashShuffle;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.Shuffle;
@@ -28,11 +29,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
import java.io.IOException;
-/** JSON serializer for {@link Shuffle}. */
-public class ShuffleJsonSerializer extends StdSerializer<Shuffle> {
+/**
+ * JSON serializer for {@link Shuffle}.
+ *
+ * @see ShuffleJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ShuffleJsonSerializer extends StdSerializer<Shuffle> {
private static final long serialVersionUID = 1L;
- public ShuffleJsonSerializer() {
+ ShuffleJsonSerializer() {
super(Shuffle.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
index 78a87829..6250157 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.Constraint.ConstraintType;
import org.apache.flink.table.catalog.UniqueConstraint;
@@ -28,12 +29,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import java.util.List;
/** Mixin for {@link UniqueConstraint}. */
+@Internal
abstract class UniqueConstraintMixin {
- public static final String NAME = "name";
- public static final String ENFORCED = "enforced";
- public static final String TYPE = "type";
- public static final String COLUMNS = "columns";
+ static final String NAME = "name";
+ static final String ENFORCED = "enforced";
+ static final String TYPE = "type";
+ static final String COLUMNS = "columns";
@JsonCreator
private UniqueConstraintMixin(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
index 26fa1c9..d914c48 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.serde;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.expressions.ResolvedExpression;
@@ -25,10 +26,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
/** Mixin for {@link WatermarkSpec}. */
+@Internal
abstract class WatermarkSpecMixin {
- public static final String ROWTIME_ATTRIBUTE = "rowtimeAttribute";
- public static final String EXPRESSION = "expression";
+ static final String ROWTIME_ATTRIBUTE = "rowtimeAttribute";
+ static final String EXPRESSION = "expression";
@JsonCreator
private WatermarkSpecMixin(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
index 2a03d5c..f4abacb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
@@ -40,10 +40,10 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.WindowReferen
*/
@Deprecated
@Internal
-public final class WindowReferenceJsonDeserializer extends StdDeserializer<WindowReference> {
+final class WindowReferenceJsonDeserializer extends StdDeserializer<WindowReference> {
private static final long serialVersionUID = 1L;
- protected WindowReferenceJsonDeserializer() {
+ WindowReferenceJsonDeserializer() {
super(WindowReference.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
index 179a4cd..e1f296c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
@@ -35,13 +35,13 @@ import java.io.IOException;
*/
@Deprecated
@Internal
-public final class WindowReferenceJsonSerializer extends StdSerializer<WindowReference> {
+final class WindowReferenceJsonSerializer extends StdSerializer<WindowReference> {
private static final long serialVersionUID = 1L;
- public static final String FIELD_NAME_NAME = "name";
- public static final String FIELD_NAME_TYPE = "type";
+ static final String FIELD_NAME_NAME = "name";
+ static final String FIELD_NAME_TYPE = "type";
- protected WindowReferenceJsonSerializer() {
+ WindowReferenceJsonSerializer() {
super(WindowReference.class);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
index 63d1fdc..c7ea182 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
@@ -18,15 +18,10 @@
package org.apache.flink.table.planner.plan.nodes.exec.spec;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonSerializer;
-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rex.RexLiteral;
@@ -129,14 +124,10 @@ public class OverSpec {
/** The lower bound of the window. */
@JsonProperty(FIELD_NAME_LOWER_BOUND)
- @JsonSerialize(using = RexWindowBoundJsonSerializer.class)
- @JsonDeserialize(using = RexWindowBoundJsonDeserializer.class)
private final RexWindowBound lowerBound;
/** The upper bound of the window. */
@JsonProperty(FIELD_NAME_UPPER_BOUND)
- @JsonSerialize(using = RexWindowBoundJsonSerializer.class)
- @JsonDeserialize(using = RexWindowBoundJsonDeserializer.class)
private final RexWindowBound upperBound;
/** The agg functions set. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index 7b31d27..9e68aa5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -35,8 +35,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -56,8 +54,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
@@ -108,8 +104,6 @@ public class StreamExecGlobalGroupAggregate extends StreamExecAggregateBase {
/** The input row type of this node's local agg. */
@JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
- @JsonSerialize(using = LogicalTypeJsonSerializer.class)
- @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
private final RowType localAggInputRowType;
/** Whether this node will generate UPDATE_BEFORE messages. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index b026708..d3c2710 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -34,8 +34,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -57,8 +55,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
@@ -99,8 +95,6 @@ public class StreamExecGlobalWindowAggregate extends StreamExecWindowAggregateBa
/** The input row type of this node's local agg. */
@JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
- @JsonSerialize(using = LogicalTypeJsonSerializer.class)
- @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
private final RowType localAggInputRowType;
public StreamExecGlobalWindowAggregate(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
index be4cd76..0e33097 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
@@ -41,8 +41,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
@@ -68,8 +66,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
@@ -133,8 +129,6 @@ public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
private final AggregateCall[] aggCalls;
@JsonProperty(FIELD_NAME_WINDOW)
- @JsonSerialize(using = LogicalWindowJsonSerializer.class)
- @JsonDeserialize(using = LogicalWindowJsonDeserializer.class)
private final LogicalWindow window;
@JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index e6e354d..9322899 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -32,8 +32,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -49,8 +47,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
@@ -107,8 +103,6 @@ public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase
/** The input row type of this node's partial local agg. */
@JsonProperty(FIELD_NAME_PARTIAL_LOCAL_AGG_INPUT_TYPE)
- @JsonSerialize(using = LogicalTypeJsonSerializer.class)
- @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
private final RowType partialLocalAggInputType;
/** Whether this node consumes retraction messages. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index 0577a51..2334159 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -33,8 +33,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.ChangelogModeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.ChangelogModeJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.LogicalType;
@@ -43,8 +41,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.ArrayList;
import java.util.Collections;
@@ -80,8 +76,6 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj
public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize";
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
- @JsonSerialize(using = ChangelogModeJsonSerializer.class)
- @JsonDeserialize(using = ChangelogModeJsonDeserializer.class)
private final ChangelogMode inputChangelogMode;
@JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 2b973dc6..0353338 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -24,8 +24,6 @@ import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.functions.UserDefinedFunction;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer;
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
@@ -38,8 +36,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rex.RexLiteral;
@@ -77,8 +73,6 @@ public final class LookupJoinUtil {
public final LogicalType sourceType;
@JsonProperty(FIELD_NAME_LITERAL)
- @JsonSerialize(using = RexNodeJsonSerializer.class)
- @JsonDeserialize(using = RexNodeJsonDeserializer.class)
public final RexLiteral literal;
@JsonCreator