You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/13 09:27:59 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request, #435: [FLINK-30211] Introduce CompactorSink for compact jobs in Table Store

tsreaper opened a new pull request, #435:
URL: https://github.com/apache/flink-table-store/pull/435

   This ticket introduces the `CompactorStoreSink` for compact jobs in Table Store.
   
   The behavior of `CompactorStoreSink` is sketched as follows: This sink accepts records containing partitions and buckets to compact and perform compaction on these buckets.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #435: [FLINK-30211] Introduce CompactorSink for compact jobs in Table Store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #435:
URL: https://github.com/apache/flink-table-store/pull/435#discussion_r1047065211


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/AtMostOnceCommitterOperator.java:
##########
@@ -69,34 +69,24 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
     private final String initialCommitUser;
 
     /** Group the committable by the checkpoint id. */
-    private final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint;

Review Comment:
   Can we have an interface to describe these logic? Maybe a `CommitRetry`?
   In this way, we can have only one `CommitterOperator`. Different requirements can have different `CommitRetry`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #435: [FLINK-30211] Introduce CompactorSink for compact jobs in Table Store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #435:
URL: https://github.com/apache/flink-table-store/pull/435#discussion_r1047065211


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/AtMostOnceCommitterOperator.java:
##########
@@ -69,34 +69,24 @@ public class CommitterOperator extends AbstractStreamOperator<Committable>
     private final String initialCommitUser;
 
     /** Group the committable by the checkpoint id. */
-    private final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint;

Review Comment:
   Can we have an interface to describe these logic? Maybe a `CommitRecover`?
   In this way, we can have only one `CommitterOperator`. Different requirements can have different `CommitRecover`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/AbstractFlinkSink.java:
##########
@@ -158,4 +125,10 @@ private void assertCheckpointConfiguration(StreamExecutionEnvironment env) {
                         + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
                         + " to exactly-once");
     }
+
+    protected abstract OneInputStreamOperator<RowData, Committable> createWriteOperator(
+            String initialCommitUser, boolean isStreaming);
+
+    protected abstract OneInputStreamOperator<Committable, Committable> createCommitterOperator(

Review Comment:
   Here we can just a `CommitRecover`, let subclasses only return different ones and mask the same part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper merged pull request #435: [FLINK-30211] Introduce CompactorSink for compact jobs in Table Store

Posted by GitBox <gi...@apache.org>.
tsreaper merged PR #435:
URL: https://github.com/apache/flink-table-store/pull/435


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #435: [FLINK-30211] Introduce CompactorSink for compact jobs in Table Store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #435:
URL: https://github.com/apache/flink-table-store/pull/435#discussion_r1047070272


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CompactorStreamPartitioner.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link StreamPartitioner} for stand-alone compact job sinks so that the same bucket is compacted
+ * by the same sink parallelism.
+ */
+public class CompactorStreamPartitioner extends StreamPartitioner<RowData> {
+
+    private final RowType rowType;
+
+    private transient RowDataSerializer serializer;
+
+    public CompactorStreamPartitioner(RowType partitionType) {

Review Comment:
   I think we can have a `HashRowStreamPartitioner`, here pass a row type to here.
   The row type can be get from `BucketsTable.rowType(RowType partitionType)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #435: [FLINK-30211] Introduce CompactorSink for compact jobs in Table Store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #435:
URL: https://github.com/apache/flink-table-store/pull/435#discussion_r1047076834


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/AbstractFlinkSink.java:
##########
@@ -158,4 +125,10 @@ private void assertCheckpointConfiguration(StreamExecutionEnvironment env) {
                         + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
                         + " to exactly-once");
     }
+
+    protected abstract OneInputStreamOperator<RowData, Committable> createWriteOperator(

Review Comment:
   `createWriteOperator(StoreSinkWrite.Provider writeProvider, boolean isStreaming);`



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/AbstractFlinkSink.java:
##########
@@ -33,75 +33,45 @@
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
-import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.LogSinkFunction;
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
-import java.util.Map;
 import java.util.UUID;
 
-/** Sink of dynamic store. */
-public class StoreSink implements Serializable {
+/** Abstract sink of table store. */
+public abstract class AbstractFlinkSink implements Serializable {

Review Comment:
   Maybe just `FlinkSink`, consistent with `FlinkSource`.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/AbstractFlinkSink.java:
##########
@@ -158,4 +125,10 @@ private void assertCheckpointConfiguration(StreamExecutionEnvironment env) {
                         + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key()
                         + " to exactly-once");
     }
+
+    protected abstract OneInputStreamOperator<RowData, Committable> createWriteOperator(
+            String initialCommitUser, boolean isStreaming);
+
+    protected abstract OneInputStreamOperator<Committable, Committable> createCommitterOperator(

Review Comment:
   Here we can just a `CommitRetry`, let subclasses only return different ones and mask the same part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org