You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/17 13:12:16 UTC
[hudi] branch master updated: [HUDI-6052] Standardise TIMESTAMP(6) format when writing to Parquet files (#8418)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 64bf871cfc3 [HUDI-6052] Standardise TIMESTAMP(6) format when writing to Parquet files (#8418)
64bf871cfc3 is described below
commit 64bf871cfc3cfc08478cf04e02d2f7086f72548e
Author: voonhous <vo...@gmail.com>
AuthorDate: Mon Apr 17 21:12:06 2023 +0800
[HUDI-6052] Standardise TIMESTAMP(6) format when writing to Parquet files (#8418)
Co-authored-by: hbgstc123 <hb...@gmail.com>
---
.../row/HoodieRowDataParquetWriteSupport.java | 2 +-
.../storage/row/parquet/ParquetRowDataWriter.java | 63 ++++++--
.../row/parquet/ParquetSchemaConverter.java | 10 +-
.../row/parquet/TestParquetSchemaConverter.java | 2 +-
.../org/apache/hudi/util/AvroSchemaConverter.java | 4 +-
.../apache/hudi/util/AvroToRowDataConverters.java | 2 +-
.../sink/cluster/ITTestHoodieFlinkClustering.java | 180 +++++++++++++++++++++
.../vector/reader/Int64TimestampColumnReader.java | 2 +-
.../vector/reader/Int64TimestampColumnReader.java | 2 +-
.../vector/reader/Int64TimestampColumnReader.java | 2 +-
.../vector/reader/Int64TimestampColumnReader.java | 2 +-
11 files changed, 244 insertions(+), 27 deletions(-)
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
index b939498c3e2..4a3109db60a 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -20,11 +20,11 @@ package org.apache.hudi.io.storage.row;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.util.Option;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hudi.common.util.Option;
import org.apache.parquet.hadoop.api.WriteSupport;
import java.nio.charset.StandardCharsets;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
index 3d9524eaa30..e5b9509d879 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetRowDataWriter.java
@@ -18,6 +18,8 @@
package org.apache.hudi.io.storage.row.parquet;
+import org.apache.hudi.common.util.ValidationUtils;
+
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.MapData;
@@ -124,17 +126,19 @@ public class ParquetRowDataWriter {
return new DoubleWriter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) t;
- if (timestampType.getPrecision() == 3) {
- return new Timestamp64Writer();
+ final int tsPrecision = timestampType.getPrecision();
+ if (tsPrecision == 3 || tsPrecision == 6) {
+ return new Timestamp64Writer(tsPrecision);
} else {
- return new Timestamp96Writer(timestampType.getPrecision());
+ return new Timestamp96Writer(tsPrecision);
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) t;
- if (localZonedTimestampType.getPrecision() == 3) {
- return new Timestamp64Writer();
+ final int tsLtzPrecision = localZonedTimestampType.getPrecision();
+ if (tsLtzPrecision == 3 || tsLtzPrecision == 6) {
+ return new Timestamp64Writer(tsLtzPrecision);
} else {
- return new Timestamp96Writer(localZonedTimestampType.getPrecision());
+ return new Timestamp96Writer(tsLtzPrecision);
}
case ARRAY:
ArrayType arrayType = (ArrayType) t;
@@ -284,33 +288,64 @@ public class ParquetRowDataWriter {
}
/**
- * Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
+ * TIMESTAMP_MILLIS and TIMESTAMP_MICROS is the deprecated ConvertedType of TIMESTAMP with the MILLIS and MICROS
+ * precision respectively. See
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
- * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
*/
private class Timestamp64Writer implements FieldWriter {
- private Timestamp64Writer() {
+ private final int precision;
+ private Timestamp64Writer(int precision) {
+ ValidationUtils.checkArgument(precision == 3 || precision == 6,
+ "Timestamp64Writer is only able to support precisions of {3, 6}");
+ this.precision = precision;
}
@Override
public void write(RowData row, int ordinal) {
- recordConsumer.addLong(timestampToInt64(row.getTimestamp(ordinal, 3)));
+ TimestampData timestampData = row.getTimestamp(ordinal, precision);
+ recordConsumer.addLong(timestampToInt64(timestampData, precision));
}
@Override
public void write(ArrayData array, int ordinal) {
- recordConsumer.addLong(timestampToInt64(array.getTimestamp(ordinal, 3)));
+ TimestampData timestampData = array.getTimestamp(ordinal, precision);
+ recordConsumer.addLong(timestampToInt64(timestampData, precision));
}
}
- private long timestampToInt64(TimestampData timestampData) {
- return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime();
+ /**
+ * Converts a {@code TimestampData} to its corresponding int64 value. This function only accepts TimestampData of
+ * precision 3 or 6. Special attention will need to be given to a TimestampData of precision = 6.
+ * <p>
+ * For example representing `1970-01-01T00:00:03.100001` of precision 6 will have:
+ * <ul>
+ * <li>millisecond = 3100</li>
+ * <li>nanoOfMillisecond = 1000</li>
+ * </ul>
+ * As such, the int64 value will be:
+ * <p>
+ * millisecond * 1000 + nanoOfMillisecond / 1000
+ *
+ * @param timestampData TimestampData to be converted to int64 format
+ * @param precision the precision of the TimestampData
+ * @return int64 value of the TimestampData
+ */
+ private long timestampToInt64(TimestampData timestampData, int precision) {
+ if (precision == 3) {
+ return utcTimestamp ? timestampData.getMillisecond() : timestampData.toTimestamp().getTime();
+ } else {
+ // using an else clause here as precision has been validated to be {3, 6} in the constructor
+ // convert timestampData to microseconds format
+ return utcTimestamp ? timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000 :
+ timestampData.toTimestamp().getTime() * 1000;
+ }
}
/**
* Timestamp of INT96 bytes, julianDay(4) + nanosOfDay(8). See
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
- * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
+ * <p>
+ * TODO: Leaving this here as there might be a requirement to support TIMESTAMP(9) in the future
*/
private class Timestamp96Writer implements FieldWriter {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
index 5fb76f9418c..7cd7c300670 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java
@@ -600,9 +600,10 @@ public class ParquetSchemaConverter {
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
- if (timestampType.getPrecision() == 3) {
+ if (timestampType.getPrecision() == 3 || timestampType.getPrecision() == 6) {
+ TimeUnit timeunit = timestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
- .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS))
+ .as(LogicalTypeAnnotation.timestampType(true, timeunit))
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
@@ -610,9 +611,10 @@ public class ParquetSchemaConverter {
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
- if (localZonedTimestampType.getPrecision() == 3) {
+ if (localZonedTimestampType.getPrecision() == 3 || localZonedTimestampType.getPrecision() == 6) {
+ TimeUnit timeunit = localZonedTimestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
- .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS))
+ .as(LogicalTypeAnnotation.timestampType(false, timeunit))
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
index a1a07a65f99..3d5012b73b3 100644
--- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java
@@ -83,7 +83,7 @@ public class TestParquetSchemaConverter {
assertThat(messageType.getColumns().size(), is(3));
final String expected = "message converted {\n"
+ " optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n"
- + " optional int96 ts_6;\n"
+ + " optional int64 ts_6 (TIMESTAMP(MICROS,true));\n"
+ " optional int96 ts_9;\n"
+ "}\n";
assertThat(messageType.toString(), is(expected));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
index 44253e37329..6e4c3ec6137 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -256,7 +256,7 @@ public class AvroSchemaConverter {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only support precisions <= 6.");
}
Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
@@ -273,7 +273,7 @@ public class AvroSchemaConverter {
throw new IllegalArgumentException(
"Avro does not support LOCAL TIMESTAMP type with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only support precisions <= 6.");
}
Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index 5c9988dc0b2..38633b8ad9e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -215,7 +215,7 @@ public class AvroToRowDataConverters {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only support precisions <= 6.");
}
return avroObject -> {
final Instant instant;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 9fdc6fc8b10..095fe1d369e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -52,8 +52,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -68,8 +71,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -419,4 +424,179 @@ public class ITTestHoodieFlinkClustering {
.stream().anyMatch(fg -> fg.getSlices()
.stream().anyMatch(s -> s.getDataFilePath().contains(firstClusteringInstant))));
}
+
+ /**
+ * Test to ensure that creating a table with a column of TIMESTAMP(9) will throw errors
+ * @throws Exception
+ */
+ @Test
+ public void testHoodieFlinkClusteringWithTimestampNanos() {
+ // create hoodie table and insert into data
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+ // use append mode
+ options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
+ options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");
+
+ // row schema
+ final DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
+ DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(9)), // precombine field
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull();
+
+ final RowType rowType = (RowType) dataType.getLogicalType();
+ final List<String> fields = rowType.getFields().stream()
+ .map(RowType.RowField::asSummaryString).collect(Collectors.toList());
+
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL(
+ "t1", fields, options, true, "uuid", "partition");
+ TableResult tableResult = tableEnv.executeSql(hoodieTableDDL);
+
+ // insert rows with timestamp of microseconds precision; timestamp(6)
+ final String insertSql = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001001','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001001','par1'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001001','par2'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001001','par2'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001001','par3'),\n"
+ + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001001','par3'),\n"
+ + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001001','par4'),\n"
+ + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001001','par4')";
+
+ assertThrows(ValidationException.class, () -> tableEnv.executeSql(insertSql),
+ "Avro does not support TIMESTAMP type with precision: 9, it only support precisions <= 6.");
+ }
+
+ @Test
+ public void testHoodieFlinkClusteringWithTimestampMicros() throws Exception {
+ // create hoodie table and insert into data
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+ .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+
+ // use append mode
+ options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
+
+ // row schema
+ final DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
+ DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)), // precombine field
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull();
+ final RowType rowType = (RowType) dataType.getLogicalType();
+ final List<String> fields = rowType.getFields().stream()
+ .map(RowType.RowField::asSummaryString).collect(Collectors.toList());
+
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL(
+ "t1", fields, options, true, "uuid", "partition");
+ tableEnv.executeSql(hoodieTableDDL);
+
+ // insert rows with timestamp of microseconds precision; timestamp(6)
+ final String insertSql = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01.100001','par1'),\n"
+ + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02.100001','par1'),\n"
+ + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03.100001','par2'),\n"
+ + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04.100001','par2'),\n"
+ + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05.100001','par3'),\n"
+ + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06.100001','par3'),\n"
+ + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07.100001','par4'),\n"
+ + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08.100001','par4')";
+ tableEnv.executeSql(insertSql).await();
+
+ // wait for the asynchronous commit to finish
+ TimeUnit.SECONDS.sleep(3);
+
+ // make configuration and setAvroSchema.
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ cfg.targetPartitions = 4;
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set the table name
+ conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());
+
+ // set record key field
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
+ // set partition field
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());
+
+ long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ // judge whether have operation
+ // To compute the clustering instant time and do clustering.
+ String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+
+ HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf);
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+
+ assertTrue(scheduled, "The clustering plan should be scheduled");
+
+ // fetch the instant based on the configured execution sequence
+ table.getMetaClient().reloadActiveTimeline();
+ HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
+ .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED);
+
+ // generate clustering plan
+ // should support configurable commit metadata
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(), timeline.lastInstant().get());
+
+ HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
+
+ // Mark instant as clustering inflight
+ HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+ table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
+
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, clusteringPlan))
+ .name("clustering_source")
+ .uid("uid_clustering_source")
+ .rebalance()
+ .transform("clustering_task",
+ TypeInformation.of(ClusteringCommitEvent.class),
+ new ClusteringOperator(conf, rowType))
+ .setParallelism(clusteringPlan.getInputGroups().size());
+
+ ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+
+ dataStream
+ .addSink(new ClusteringCommitSink(conf))
+ .name("clustering_commit")
+ .uid("uid_clustering_commit")
+ .setParallelism(1);
+
+ env.execute("flink_hudi_clustering");
+
+ // test output
+ final Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1, id2,par1,id2,Stephen,33,2100001,par1]");
+ expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2, id4,par2,id4,Fabian,31,4100001,par2]");
+ expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3, id6,par3,id6,Emma,20,6100001,par3]");
+ expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4, id8,par4,id8,Han,56,8100001,par4]");
+ TestData.checkWrittenData(tempFile, expected, 4);
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
index 555853bda6b..70638a9c432 100644
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
@@ -59,7 +59,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only support precisions <= 6.");
}
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
index 555853bda6b..70638a9c432 100644
--- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
@@ -59,7 +59,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only support precisions <= 6.");
}
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}
diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
index 417b1155bbd..b44273b57ca 100644
--- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
+++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
@@ -59,7 +59,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only support precisions <= 6.");
}
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}
diff --git a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
index 417b1155bbd..b44273b57ca 100644
--- a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
+++ b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
@@ -59,7 +59,7 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only support precisions <= 6.");
}
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}