You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:01:30 UTC

[GitHub] [hudi] danny0405 commented on a change in pull request #3100: [HUDI-2034] Support explicit partition compaction strategy for flink …

danny0405 commented on a change in pull request #3100:
URL: https://github.com/apache/hudi/pull/3100#discussion_r654179459



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/ExplicitPartitionCompactionStrategy.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Compaction strategy with explicit partition, it is based on the {@link LogFileSizeBasedCompactionStrategy}.
+ */
+public class ExplicitPartitionCompactionStrategy extends LogFileSizeBasedCompactionStrategy {
+
+  @Override
+  public Map<String, Double> captureMetrics(HoodieWriteConfig writeConfig, FileSlice slice) {
+    return super.captureMetrics(writeConfig, slice);
+  }
+
+  @Override
+  public HoodieCompactionPlan generateCompactionPlan(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
+    return super.generateCompactionPlan(writeConfig, operations, pendingCompactionPlans);
+  }
+
+  @Override
+  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
+    return super.orderAndFilter(writeConfig, operations, pendingCompactionPlans);
+  }
+
+  @Override
+  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths) {
+    String itemPartition = writeConfig.getProps().getProperty("compaction.partition");
+    List<String> filteredPartitionPaths = new ArrayList<>();

Review comment:
       Use `FlinkOptions.COMPACTION_PARTITION.key()`

##########
File path: hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
##########
@@ -239,6 +239,88 @@ public void testHoodieFlinkCompactor() throws Exception {
     TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
 
+  @Test
+  public void testHoodieFlinkCompactorForSpecifyPartition() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+            .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
+    tableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 values\n"
+            + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+            + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+            + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+            + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+            + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+            + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+            + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+            + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+    tableEnv.executeSql(insertInto).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    cfg.compactionPartition = "par1";
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // judge whether have operation
+    // To compute the compaction instant time and do compaction.
+    String instantTime = CompactionUtil.getCompactionInstantTime(metaClient);
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
+    writeClient.scheduleCompactionAtInstant(instantTime, Option.empty());
+
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+    // the last instant takes the highest priority.
+    Option<HoodieInstant> compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
+    String compactionInstantTime = compactionInstant.get().getTimestamp();
+
+    // generate compaction plan
+    // should support configurable commit metadata
+    HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
+            table.getMetaClient(), compactionInstantTime);

Review comment:
       Modify the logic to be synced with `HoodieFlinkCompactor`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org